From 1427a8f3b0d95ef7dff1500b32fb570e7feb270b Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 9 Sep 2022 19:03:42 +0800 Subject: [PATCH] enh: support client query policy --- include/libs/executor/executor.h | 9 +- include/libs/nodes/plannodes.h | 1 + include/libs/qworker/qworker.h | 23 ++++- source/client/src/clientImpl.c | 6 +- source/client/src/clientMain.c | 2 - source/common/src/tglobal.c | 2 +- source/dnode/vnode/src/sma/smaRollup.c | 2 +- source/libs/executor/inc/executorimpl.h | 1 + source/libs/executor/src/executor.c | 7 +- source/libs/executor/src/executorimpl.c | 69 ++++++++------ source/libs/nodes/src/nodesCloneFuncs.c | 1 + source/libs/qworker/inc/qwInt.h | 18 +--- source/libs/qworker/inc/qwMsg.h | 2 +- source/libs/qworker/src/qwMsg.c | 4 +- source/libs/qworker/src/qworker.c | 117 ++++++++++++++++++----- source/libs/scheduler/CMakeLists.txt | 2 +- source/libs/scheduler/inc/schInt.h | 4 +- source/libs/scheduler/src/schRemote.c | 118 ++++++++++++++---------- source/libs/scheduler/src/schTask.c | 48 ++++++++-- source/libs/scheduler/src/scheduler.c | 4 + 20 files changed, 301 insertions(+), 139 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 25a6221fcb..37f89caa41 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -29,6 +29,13 @@ typedef void* DataSinkHandle; struct SRpcMsg; struct SSubplan; +typedef int32_t (*localFetchFp)(void *handle, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, void** pRsp); + +typedef struct { + void *handle; + localFetchFp fp; +} SLocalFetch; + typedef struct { void* tqReader; void* meta; @@ -127,7 +134,7 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table * @param handle * @return */ -int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds); +int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SLocalFetch *pLocal); int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pBlock, uint64_t* useconds); /** diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 6fd6a316eb..5abad5cccc 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -393,6 +393,7 @@ typedef struct SDownstreamSourceNode { uint64_t schedId; int32_t execId; int32_t fetchMsgType; + bool localExec; } SDownstreamSourceNode; typedef struct SExchangePhysiNode { diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index b83ffe4db2..6062ffa3ef 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -56,6 +56,23 @@ typedef struct { uint64_t numOfErrors; } SQWorkerStat; +typedef struct SQWMsgInfo { + int8_t taskType; + int8_t explain; + int8_t needFetch; +} SQWMsgInfo; + +typedef struct SQWMsg { + void *node; + int32_t code; + int32_t msgType; + void *msg; + int32_t msgLen; + SQWMsgInfo msgInfo; + SRpcHandleInfo connInfo; +} SQWMsg; + + int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const SMsgCb *pMsgCb); int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg); @@ -78,10 +95,14 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_ int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SDeleteRes *pRes); -void qWorkerDestroy(void **qWorkerMgmt); +void qWorkerDestroy(void **qWorkerMgmt); int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pStat); +int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, SQWMsg *qwMsg); + +int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, void** pRsp); + #ifdef __cplusplus } #endif diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 54320445d4..6a8e58efb4 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -365,7 +365,7 @@ int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) { } bool qnodeRequired(SRequestObj* pRequest) { - if (QUERY_POLICY_VNODE == tsQueryPolicy) { + if (QUERY_POLICY_VNODE == tsQueryPolicy || QUERY_POLICY_CLIENT == tsQueryPolicy) { return false; } @@ -689,7 +689,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList .requestObjRefId = pRequest->self}; SSchedulerReq req = { .syncReq = true, - .localReq = (tsQueryPolicy == CLIENT_HANDLE), + .localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT), .pConn = &conn, .pNodeList = pNodeList, .pDag = pDag, @@ -1065,7 +1065,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM .pTrans = pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self}; SSchedulerReq req = { .syncReq = false, - .localReq = (tsQueryPolicy == CLIENT_HANDLE), + .localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT), .pConn = &conn, .pNodeList = pNodeList, .pDag = pDag, diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 9c435887e6..73ff36a299 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -76,8 +76,6 @@ void taos_cleanup(void) { cleanupTaskQueue(); - qWorkerDestroy(&tscQueryMgmt); - taosConvDestroy(); tscInfo("all local resources released"); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 47110237dd..add19ae337 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -285,7 +285,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "shellActivityTimer", tsShellActivityTimer, 1, 120, 1) != 0) return -1; if (cfgAddInt32(pCfg, "compressMsgSize", tsCompressMsgSize, -1, 100000000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "compressColData", tsCompressColData, -1, 100000000, 1) != 0) return -1; - if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 3, 1) != 0) return -1; + if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 4, 1) != 0) return -1; if (cfgAddInt32(pCfg, "querySmaOptimize", tsQuerySmaOptimize, 0, 1, 1) != 0) return -1; if (cfgAddBool(pCfg, "queryPlannerTrace", tsQueryPlannerTrace, true) != 0) return -1; if (cfgAddString(pCfg, "smlChildTableName", "", 1) != 0) return -1; diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index f2063e3067..d184926817 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -641,7 +641,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma while (1) { uint64_t ts; - int32_t code = qExecTaskOpt(taskInfo, pResList, &ts); + int32_t code = qExecTaskOpt(taskInfo, pResList, &ts, NULL); if (code < 0) { if (code == TSDB_CODE_QRY_IN_EXEC) { break; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 89e5b2da0a..1dfa9da7d5 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -183,6 +183,7 @@ typedef struct SExecTaskInfo { EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] SSubplan* pSubplan; struct SOperatorInfo* pRoot; + SLocalFetch localFetch; } SExecTaskInfo; enum { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 278f02b228..93303cb62d 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -450,10 +450,15 @@ static void freeBlock(void* param) { blockDataDestroy(pBlock); } -int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds) { +int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SLocalFetch* pLocal) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; int64_t threadId = taosGetSelfPthreadId(); + if (pLocal) { + pTaskInfo->localFetch.handle = pLocal->handle; + pTaskInfo->localFetch.fp = pLocal->fp; + } + taosArrayClearEx(pResList, freeBlock); int64_t curOwner = 0; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index b53d35a1a1..a1f3e5a4a5 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1999,38 +1999,45 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf ASSERT(pDataInfo->status == EX_SOURCE_DATA_NOT_READY); - qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %d/%" PRIzu, - GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, pSource->execId, - sourceIndex, totalSources); - - pMsg->header.vgId = htonl(pSource->addr.nodeId); - pMsg->sId = htobe64(pSource->schedId); - pMsg->taskId = htobe64(pSource->taskId); - pMsg->queryId = htobe64(pTaskInfo->id.queryId); - pMsg->execId = htonl(pSource->execId); - - // send the fetch remote task result reques - SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); - if (NULL == pMsgSendInfo) { - taosMemoryFreeClear(pMsg); - qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo)); - pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; - return pTaskInfo->code; - } - SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper)); pWrapper->exchangeId = pExchangeInfo->self; pWrapper->sourceIndex = sourceIndex; - pMsgSendInfo->param = pWrapper; - pMsgSendInfo->paramFreeFp = taosMemoryFree; - pMsgSendInfo->msgInfo.pData = pMsg; - pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq); - pMsgSendInfo->msgType = pSource->fetchMsgType; - pMsgSendInfo->fp = loadRemoteDataCallback; + if (pSource->localExec) { + SDataBuf pBuf = {0}; + int32_t code = (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->schedId, pTaskInfo->id.queryId, pSource->taskId, 0, pSource->execId, &pBuf.pData); + loadRemoteDataCallback(pWrapper, &pBuf, code); + } else { + qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %d/%" PRIzu, + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, pSource->execId, + sourceIndex, totalSources); + + pMsg->header.vgId = htonl(pSource->addr.nodeId); + pMsg->sId = htobe64(pSource->schedId); + pMsg->taskId = htobe64(pSource->taskId); + pMsg->queryId = htobe64(pTaskInfo->id.queryId); + pMsg->execId = htonl(pSource->execId); + + // send the fetch remote task result reques + SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); + if (NULL == pMsgSendInfo) { + taosMemoryFreeClear(pMsg); + qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo)); + pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; + return pTaskInfo->code; + } + + pMsgSendInfo->param = pWrapper; + pMsgSendInfo->paramFreeFp = taosMemoryFree; + pMsgSendInfo->msgInfo.pData = pMsg; + pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq); + pMsgSendInfo->msgType = pSource->fetchMsgType; + pMsgSendInfo->fp = loadRemoteDataCallback; - int64_t transporterId = 0; - int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo); + int64_t transporterId = 0; + int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo); + } + return TSDB_CODE_SUCCESS; } @@ -3963,7 +3970,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo STableScanInfo* pScanInfo = pOperator->info; pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) { - pOperator = createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo); + pOperator = createExchangeOperatorInfo(pHandle ? pHandle->pMsgCb->clientRpc : NULL, (SExchangePhysiNode*)pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; if (pHandle->vnode) { @@ -4474,8 +4481,10 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) { destroyOperatorInfo(pTaskInfo->pRoot); cleanupTableSchemaInfo(&pTaskInfo->schemaInfo); - nodesDestroyNode((SNode*)pTaskInfo->pSubplan); - + if (!pTaskInfo->localFetch.fp) { + nodesDestroyNode((SNode*)pTaskInfo->pSubplan); + } + taosMemoryFreeClear(pTaskInfo->sql); taosMemoryFreeClear(pTaskInfo->id.str); taosMemoryFreeClear(pTaskInfo); diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 83bccbffb4..904b8f01eb 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -609,6 +609,7 @@ static int32_t downstreamSourceCopy(const SDownstreamSourceNode* pSrc, SDownstre COPY_SCALAR_FIELD(schedId); COPY_SCALAR_FIELD(execId); COPY_SCALAR_FIELD(fetchMsgType); + COPY_SCALAR_FIELD(localExec); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 8efc247524..150dde91e0 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -83,22 +83,6 @@ typedef struct SQWDebug { extern SQWDebug gQWDebug; -typedef struct SQWMsgInfo { - int8_t taskType; - int8_t explain; - int8_t needFetch; -} SQWMsgInfo; - -typedef struct SQWMsg { - void *node; - int32_t code; - int32_t msgType; - void *msg; - int32_t msgLen; - SQWMsgInfo msgInfo; - SRpcHandleInfo connInfo; -} SQWMsg; - typedef struct SQWHbParam { bool inUse; int32_t qwrId; @@ -133,6 +117,7 @@ typedef struct SQWTaskCtx { int8_t taskType; int8_t explain; int8_t needFetch; + int8_t localExec; int32_t msgType; int32_t fetchType; int32_t execId; @@ -150,6 +135,7 @@ typedef struct SQWTaskCtx { int8_t events[QW_EVENT_MAX]; + SArray *explainRes; void *taskHandle; void *sinkHandle; STbVerInfo tbInfo; diff --git a/source/libs/qworker/inc/qwMsg.h b/source/libs/qworker/inc/qwMsg.h index 3ff5b5950f..b46c5d6baf 100644 --- a/source/libs/qworker/inc/qwMsg.h +++ b/source/libs/qworker/inc/qwMsg.h @@ -42,7 +42,7 @@ int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t c int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SArray* pExecList); int32_t qwBuildAndSendErrorRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code); void qwFreeFetchRsp(void *msg); -int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp); +int32_t qwMallocFetchRsp(int8_t rpcMalloc, int32_t length, SRetrieveTableRsp **rsp); int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *rsp, int32_t code); int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn); int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *pConn); diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index ecc25e9c2d..2acf3fad15 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -9,10 +9,10 @@ #include "tmsg.h" #include "tname.h" -int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) { +int32_t qwMallocFetchRsp(int8_t rpcMalloc, int32_t length, SRetrieveTableRsp **rsp) { int32_t msgSize = sizeof(SRetrieveTableRsp) + length; - SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcReallocCont(*rsp, msgSize); + SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)(rpcMalloc ? rpcReallocCont(*rsp, msgSize) : taosMemoryRealloc(*rsp, msgSize)); if (NULL == pRsp) { qError("rpcMallocCont %d failed", msgSize); QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 77c8aa5cd9..00588977bc 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -57,11 +57,15 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { SArray *execInfoList = taosArrayInit(4, sizeof(SExplainExecInfo)); QW_ERR_RET(qGetExplainExecInfo(taskHandle, execInfoList)); - SRpcHandleInfo connInfo = ctx->ctrlConnInfo; - connInfo.ahandle = NULL; - int32_t code = qwBuildAndSendExplainRsp(&connInfo, execInfoList); - taosArrayDestroyEx(execInfoList, freeItem); - QW_ERR_RET(code); + if (ctx->localExec) { + + } else { + SRpcHandleInfo connInfo = ctx->ctrlConnInfo; + connInfo.ahandle = NULL; + int32_t code = qwBuildAndSendExplainRsp(&connInfo, execInfoList); + taosArrayDestroyEx(execInfoList, freeItem); + QW_ERR_RET(code); + } } if (!ctx->needFetch) { @@ -80,6 +84,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { int32_t execNum = 0; qTaskInfo_t taskHandle = ctx->taskHandle; DataSinkHandle sinkHandle = ctx->sinkHandle; + SLocalFetch localFetch = {(void*)mgmt, qWorkerProcessLocalFetch}; SArray *pResList = taosArrayInit(4, POINTER_BYTES); while (true) { @@ -88,7 +93,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { // if *taskHandle is NULL, it's killed right now if (taskHandle) { qwDbgSimulateSleep(); - code = qExecTaskOpt(taskHandle, pResList, &useconds); + code = qExecTaskOpt(taskHandle, pResList, &useconds, &localFetch); if (code) { if (code != TSDB_CODE_OPS_NOT_SUPPORT) { QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code)); @@ -229,7 +234,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC); if (NULL == rsp) { - QW_ERR_RET(qwMallocFetchRsp(len, &rsp)); + QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, len, &rsp)); *pOutput = output; } else { pOutput->queryEnd = output.queryEnd; @@ -250,7 +255,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, *dataLen += len; - QW_ERR_RET(qwMallocFetchRsp(*dataLen, &rsp)); + QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, *dataLen, &rsp)); output.pData = rsp->data + *dataLen - len; code = dsGetDataBlock(ctx->sinkHandle, &output); @@ -474,16 +479,18 @@ _return: } if (QW_PHASE_POST_QUERY == phase && ctx) { - ctx->queryRsped = true; - - bool rsped = false; - SQWMsg qwMsg = {.msgType = ctx->msgType, .connInfo = ctx->ctrlConnInfo}; - qwDbgSimulateRedirect(&qwMsg, ctx, &rsped); - qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped); - if (!rsped) { - qwBuildAndSendQueryRsp(input->msgType + 1, &ctx->ctrlConnInfo, code, ctx); - QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); + if (!ctx->localExec) { + bool rsped = false; + SQWMsg qwMsg = {.msgType = ctx->msgType, .connInfo = ctx->ctrlConnInfo}; + qwDbgSimulateRedirect(&qwMsg, ctx, &rsped); + qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped); + if (!rsped) { + qwBuildAndSendQueryRsp(input->msgType + 1, &ctx->ctrlConnInfo, code, ctx); + QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); + } } + + ctx->queryRsped = true; } if (ctx) { @@ -551,6 +558,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { ctx->explain = qwMsg->msgInfo.explain; ctx->needFetch = qwMsg->msgInfo.needFetch; ctx->msgType = qwMsg->msgType; + ctx->localExec = false; // QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg); @@ -1034,7 +1042,11 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const S mgmt->nodeType = nodeType; mgmt->nodeId = nodeId; - mgmt->msgCb = pMsgCb ? *pMsgCb : NULL; + if (pMsgCb) { + mgmt->msgCb = *pMsgCb; + } else { + memset(&mgmt->msgCb, 0, sizeof(mgmt->msgCb)); + } mgmt->refId = taosAddRef(gQwMgmt.qwRef, mgmt); if (mgmt->refId < 0) { @@ -1111,26 +1123,33 @@ int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pSt return TSDB_CODE_SUCCESS; } -int32_t qWorkerProcessLocalQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { +int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, SQWMsg *qwMsg, SArray *explainRes) { + SQWorker *mgmt = (SQWorker*)pMgmt; int32_t code = 0; SQWTaskCtx *ctx = NULL; SSubplan *plan = (SSubplan *)qwMsg->msg; SQWPhaseInput input = {0}; qTaskInfo_t pTaskInfo = NULL; DataSinkHandle sinkHandle = NULL; + SReadHandle rHandle = {0}; QW_ERR_JRET(qwAddTaskCtx(QW_FPARAMS())); + QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT)); + QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, NULL)); QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx)); - QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT)); - ctx->taskType = qwMsg->msgInfo.taskType; ctx->explain = qwMsg->msgInfo.explain; ctx->needFetch = qwMsg->msgInfo.needFetch; ctx->msgType = qwMsg->msgType; + ctx->localExec = true; + ctx->explainRes = explainRes; - code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH); + rHandle.pMsgCb = taosMemoryCalloc(1, sizeof(SMsgCb)); + rHandle.pMsgCb->clientRpc = qwMsg->connInfo.handle; + + code = qCreateExecTask(&rHandle, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH); if (code) { QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code)); QW_ERR_JRET(code); @@ -1149,6 +1168,12 @@ int32_t qWorkerProcessLocalQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { _return: + taosMemoryFree(rHandle.pMsgCb); + + input.code = code; + input.msgType = qwMsg->msgType; + code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL); + if (ctx) { QW_UPDATE_RSP_CODE(ctx, code); qwReleaseTaskCtx(mgmt, ctx); @@ -1157,4 +1182,52 @@ _return: QW_RET(code); } +int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, void** pRsp, SArray* explainRes) { + SQWorker *mgmt = (SQWorker*)pMgmt; + int32_t code = 0; + int32_t dataLen = 0; + SQWTaskCtx *ctx = NULL; + void *rsp = NULL; + bool queryStop = false; + + SQWPhaseInput input = {0}; + + QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, NULL)); + + QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); + + ctx->msgType = TDMT_SCH_MERGE_FETCH; + ctx->explainRes = explainRes; + + SOutputData sOutput = {0}; + + while (true) { + QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); + + if (NULL == rsp) { + QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryStop)); + + continue; + } else { + bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd); + + qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete); + if (qComplete) { + atomic_store_8((int8_t *)&ctx->queryEnd, true); + } + + break; + } + } + +_return: + + *pRsp = rsp; + + input.code = code; + code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_FETCH, &input, NULL); + + QW_RET(code); +} + diff --git a/source/libs/scheduler/CMakeLists.txt b/source/libs/scheduler/CMakeLists.txt index 88180391dd..3288120b67 100644 --- a/source/libs/scheduler/CMakeLists.txt +++ b/source/libs/scheduler/CMakeLists.txt @@ -9,7 +9,7 @@ target_include_directories( target_link_libraries( scheduler - PUBLIC os util nodes planner qcom common catalog transport command + PUBLIC os util nodes planner qcom common catalog transport command qworker executor ) if(${BUILD_TEST}) diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 6292aac6d2..01d962a71f 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -264,7 +264,7 @@ typedef struct SSchJob { SHashObj *taskList; SHashObj *execTasks; // executing and executed tasks, key:taskid, value:SQueryTask* SHashObj *flowCtrl; // key is ep, element is SSchFlowControl - + SExplainCtx *explainCtx; int8_t status; SQueryNodeAddr resNode; @@ -305,6 +305,7 @@ extern SSchedulerMgmt schMgmt; #define SCH_IS_DATA_BIND_TASK(task) (((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) || ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY)) #define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum) #define SCH_IS_DATA_MERGE_TASK(task) (!SCH_IS_DATA_BIND_TASK(task)) +#define SCH_IS_LOCAL_EXEC_TASK(_job, _task) ((_job)->attr.localExec && SCH_IS_QUERY_JOB(_job) && SCH_IS_DATA_MERGE_TASK(_task)) #define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st) #define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status) @@ -503,6 +504,7 @@ void schDirectPostJobRes(SSchedulerReq* pReq, int32_t errCode); int32_t schHandleJobFailure(SSchJob *pJob, int32_t errCode); int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode); bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync); +int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rspCode); extern SSchDebug gSCHDebug; diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 5a64aaaebb..d79514a122 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -72,6 +72,71 @@ int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { return TSDB_CODE_SUCCESS; } +int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rspCode) { + SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg; + int32_t code = 0; + + SCH_ERR_JRET(rspCode); + + if (NULL == msg) { + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + if (SCH_IS_EXPLAIN_JOB(pJob)) { + if (rsp->completed) { + SRetrieveTableRsp *pRsp = NULL; + SCH_ERR_JRET(qExecExplainEnd(pJob->explainCtx, &pRsp)); + if (pRsp) { + SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp)); + } + + taosMemoryFreeClear(msg); + + return TSDB_CODE_SUCCESS; + } + + SCH_ERR_JRET(schLaunchFetchTask(pJob)); + + taosMemoryFreeClear(msg); + + return TSDB_CODE_SUCCESS; + } + + if (pJob->fetchRes) { + SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->fetchRes); + SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); + } + + atomic_store_ptr(&pJob->fetchRes, rsp); + atomic_add_fetch_32(&pJob->resNumOfRows, htonl(rsp->numOfRows)); + + if (rsp->completed) { + SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC); + } + + SCH_TASK_DLOG("got fetch rsp, rows:%d, complete:%d", htonl(rsp->numOfRows), rsp->completed); + + msg = NULL; + schProcessOnDataFetched(pJob); + +_return: + + taosMemoryFreeClear(msg); + + SCH_RET(code); +} + +int32_t schProcessExplainRsp(SSchJob *pJob, SSchTask *pTask, SExplainRsp *rsp) { + SRetrieveTableRsp *pRsp = NULL; + SCH_ERR_JRET(qExplainUpdateExecInfo(pJob->explainCtx, rsp, pTask->plan->id.groupId, &pRsp)); + + if (pRsp) { + SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp)); + } + + return TSDB_CODE_SUCCESS; +} + // Note: no more task error processing, handled in function internal int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, int32_t rspCode) { int32_t code = 0; @@ -305,61 +370,14 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SRetrieveTableRsp *pRsp = NULL; - SCH_ERR_JRET(qExplainUpdateExecInfo(pJob->explainCtx, &rsp, pTask->plan->id.groupId, &pRsp)); - - if (pRsp) { - SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp)); - } + SCH_ERR_JRET(schProcessExplainRsp(pJob, pTask, &rsp)); break; } case TDMT_SCH_FETCH_RSP: case TDMT_SCH_MERGE_FETCH_RSP: { - SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg; - - SCH_ERR_JRET(rspCode); - if (NULL == msg) { - SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); - } - - if (SCH_IS_EXPLAIN_JOB(pJob)) { - if (rsp->completed) { - SRetrieveTableRsp *pRsp = NULL; - SCH_ERR_JRET(qExecExplainEnd(pJob->explainCtx, &pRsp)); - if (pRsp) { - SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp)); - } - - taosMemoryFreeClear(msg); - - return TSDB_CODE_SUCCESS; - } - - SCH_ERR_JRET(schLaunchFetchTask(pJob)); - - taosMemoryFreeClear(msg); - - return TSDB_CODE_SUCCESS; - } - - if (pJob->fetchRes) { - SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->fetchRes); - taosMemoryFreeClear(rsp); - SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); - } - - atomic_store_ptr(&pJob->fetchRes, rsp); - atomic_add_fetch_32(&pJob->resNumOfRows, htonl(rsp->numOfRows)); - - if (rsp->completed) { - SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC); - } - - SCH_TASK_DLOG("got fetch rsp, rows:%d, complete:%d", htonl(rsp->numOfRows), rsp->completed); - + code = schProcessFetchRsp(pJob, pTask, msg, rspCode); msg = NULL; - - schProcessOnDataFetched(pJob); + SCH_ERR_JRET(code); break; } case TDMT_SCH_DROP_TASK_RSP: { diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 94b2c4d372..3cecbc9ae3 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -20,6 +20,7 @@ #include "tmsg.h" #include "tref.h" #include "trpc.h" +#include "qworker.h" void schFreeTask(SSchJob *pJob, SSchTask *pTask) { schDeregisterTaskHb(pJob, pTask); @@ -89,6 +90,10 @@ _return: } int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) { + if (SCH_IS_LOCAL_EXEC_TASK(pJob, pTask)) { + return TSDB_CODE_SUCCESS; + } + SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); if (NULL == addr) { SCH_TASK_ELOG("taosArrayGet candidate addr failed, idx:%d, size:%d", pTask->candidateIdx, @@ -292,6 +297,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { .execId = pTask->execId, .addr = pTask->succeedAddr, .fetchMsgType = SCH_FETCH_TYPE(pTask), + .localExec = SCH_IS_LOCAL_EXEC_TASK(pJob, pTask), }; qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source); SCH_UNLOCK(SCH_WRITE, &parent->planLock); @@ -847,15 +853,21 @@ int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) { if (NULL == schMgmt.queryMgmt) { SCH_ERR_RET(qWorkerInit(NODE_TYPE_CLIENT, CLIENT_HANDLE, (void **)&schMgmt.queryMgmt, NULL)); } - + + SArray *explainRes = NULL; SQWMsg qwMsg = {0}; qwMsg.msgInfo.taskType = TASK_TYPE_TEMP; qwMsg.msgInfo.explain = SCH_IS_EXPLAIN_JOB(pJob); qwMsg.msgInfo.needFetch = SCH_TASK_NEED_FETCH(pTask); qwMsg.msg = pTask->plan; qwMsg.msgType = pTask->plan->msgType; - - SCH_ERR_RET(qWorkerProcessLocalQuery((SQWorker*)schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId, pTask->execId, &qwMsg)); + qwMsg.connInfo.handle = pJob->conn.pTrans; + + if (SCH_IS_EXPLAIN_JOB(pJob)) { + explainRes = taosArrayInit(pJob->taskNum, POINTER_BYTES); + } + + SCH_ERR_RET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId, pTask->execId, &qwMsg, explainRes)); SCH_RET(schProcessOnTaskSuccess(pJob, pTask)); } @@ -878,7 +890,8 @@ int32_t schLaunchTaskImpl(void *param) { pTask->retryTimes++; pTask->waitRetry = false; - SCH_TASK_DLOG("start to launch task, execId %d, retry %d", pTask->execId, pTask->retryTimes); + SCH_TASK_DLOG("start to launch %s task, execId %d, retry %d", SCH_IS_LOCAL_EXEC_TASK(pJob, pTask) ? "LOCAL" : "REMOTE", + pTask->execId, pTask->retryTimes); SCH_LOG_TASK_START_TS(pTask); @@ -893,7 +906,7 @@ int32_t schLaunchTaskImpl(void *param) { SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC); } - if (pJob->attr.localExec && SCH_IS_QUERY_JOB(pJob) && SCH_IS_DATA_MERGE_TASK(pTask)) { + if (SCH_IS_LOCAL_EXEC_TASK(pJob, pTask)) { SCH_ERR_JRET(schLaunchLocalTask(pJob, pTask)); } else { SCH_ERR_JRET(schLaunchRemoteTask(pJob, pTask)); @@ -986,6 +999,25 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) { } } +int32_t schExecRemoteFetch(SSchJob *pJob, SSchTask *pTask) { + SCH_RET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, SCH_FETCH_TYPE(pJob->fetchTask))); +} + +int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) { + void *pRsp = NULL; + SArray *explainRes = NULL; + + if (SCH_IS_EXPLAIN_JOB(pJob)) { + explainRes = taosArrayInit(pJob->taskNum, POINTER_BYTES); + } + + SCH_ERR_RET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId, pTask->execId, &pRsp, explainRes)); + + SCH_ERR_RET(schProcessFetchRsp(pJob, pTask, pRsp, TSDB_CODE_SUCCESS)); + + return TSDB_CODE_SUCCESS; +} + // Note: no more error processing, handled in function internal int32_t schLaunchFetchTask(SSchJob *pJob) { int32_t code = 0; @@ -996,7 +1028,11 @@ int32_t schLaunchFetchTask(SSchJob *pJob) { return TSDB_CODE_SUCCESS; } - SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, SCH_FETCH_TYPE(pJob->fetchTask))); + if (SCH_IS_LOCAL_EXEC_TASK(pJob, pJob->fetchTask)) { + SCH_ERR_JRET(schExecLocalFetch(pJob, pJob->fetchTask)); + } else { + SCH_ERR_JRET(schExecRemoteFetch(pJob, pJob->fetchTask)); + } return TSDB_CODE_SUCCESS; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index f1ec7f5e04..0ccaef3857 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -17,6 +17,7 @@ #include "schInt.h" #include "tmsg.h" #include "tref.h" +#include "qworker.h" SSchedulerMgmt schMgmt = { .jobRef = -1, @@ -192,4 +193,7 @@ void schedulerDestroy(void) { schMgmt.hbConnections = NULL; } SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock); + + qWorkerDestroy(&schMgmt.queryMgmt); + schMgmt.queryMgmt = NULL; } -- GitLab