diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 1fa7dca7dc6ad975e87e18570c8a9a35d990bb7e..bf89e98ce37fd53301e1a3dcd28284a1ed44208a 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -52,6 +52,7 @@ typedef enum { #define QUERY_POLICY_VNODE 1 #define QUERY_POLICY_HYBRID 2 #define QUERY_POLICY_QNODE 3 +#define QUERY_POLICY_CLIENT 4 typedef struct STableComInfo { uint8_t numOfTags; // the number of tags in schema @@ -267,43 +268,43 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t #define qFatal(...) \ do { \ if (qDebugFlag & DEBUG_FATAL) { \ - taosPrintLog("QRY FATAL ", DEBUG_FATAL, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \ + taosPrintLog("QRY FATAL ", DEBUG_FATAL, qDebugFlag, __VA_ARGS__); \ } \ } while (0) #define qError(...) \ do { \ if (qDebugFlag & DEBUG_ERROR) { \ - taosPrintLog("QRY ERROR ", DEBUG_ERROR, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \ + taosPrintLog("QRY ERROR ", DEBUG_ERROR, qDebugFlag, __VA_ARGS__); \ } \ } while (0) #define qWarn(...) \ do { \ if (qDebugFlag & DEBUG_WARN) { \ - taosPrintLog("QRY WARN ", DEBUG_WARN, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \ + taosPrintLog("QRY WARN ", DEBUG_WARN, qDebugFlag, __VA_ARGS__); \ } \ } while (0) #define qInfo(...) \ do { \ if (qDebugFlag & DEBUG_INFO) { \ - taosPrintLog("QRY ", DEBUG_INFO, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \ + taosPrintLog("QRY ", DEBUG_INFO, qDebugFlag, __VA_ARGS__); \ } \ } while (0) #define qDebug(...) \ do { \ if (qDebugFlag & DEBUG_DEBUG) { \ - taosPrintLog("QRY ", DEBUG_DEBUG, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \ + taosPrintLog("QRY ", DEBUG_DEBUG, qDebugFlag, __VA_ARGS__); \ } \ } while (0) #define qTrace(...) \ do { \ if (qDebugFlag & DEBUG_TRACE) { \ - taosPrintLog("QRY ", DEBUG_TRACE, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \ + taosPrintLog("QRY ", DEBUG_TRACE, qDebugFlag, __VA_ARGS__); \ } \ } while (0) #define qDebugL(...) \ do { \ if (qDebugFlag & DEBUG_DEBUG) { \ - taosPrintLongString("QRY ", DEBUG_DEBUG, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \ + taosPrintLongString("QRY ", DEBUG_DEBUG, qDebugFlag, __VA_ARGS__); \ } \ } while (0) diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index 87aefe5187ec7ca61a4de5f6f14adbbf26861dfc..b83ffe4db2349f268140cf3c94c49265d627a8db 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -29,6 +29,7 @@ enum { NODE_TYPE_QNODE, NODE_TYPE_SNODE, NODE_TYPE_MNODE, + NODE_TYPE_CLIENT, }; typedef struct SQWorkerCfg { @@ -55,7 +56,7 @@ typedef struct { uint64_t numOfErrors; } SQWorkerStat; -int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb); +int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const SMsgCb *pMsgCb); int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg); diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index e6973cd390c10ff524f70549d161090582ee56ab..70bd7b0c7879c17176b9dfb6395db387c908f694 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -64,6 +64,7 @@ typedef bool (*schedulerChkKillFp)(void* param); typedef struct SSchedulerReq { bool syncReq; + bool localReq; SRequestConnInfo *pConn; SArray *pNodeList; SQueryPlan *pDag; diff --git a/include/util/tdef.h b/include/util/tdef.h index 2bc821b8736edf745a30e0e103734e4e7b7b31e4..b17b8cbc66a4aea7a9b3deaec571cee32464ec01 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -467,6 +467,7 @@ enum { #define SNODE_HANDLE -2 #define VNODE_HANDLE -3 #define BNODE_HANDLE -4 +#define CLIENT_HANDLE -5 #define TSDB_CONFIG_OPTION_LEN 32 #define TSDB_CONFIG_VALUE_LEN 64 diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index bf92a9ba6af41d8c3048684ed36eec84cc0a6235..e1cf078d577337ea4c9d66552ca158b8e01d14ad 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -27,6 +27,7 @@ #include "trpc.h" #include "tsched.h" #include "ttime.h" +#include "qworker.h" #define TSC_VAR_NOT_RELEASE 1 #define TSC_VAR_RELEASED 0 diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index f91ceb31840bbf8dccd9144d5a12a41e7f2f358a..54320445d48219bde170eea5f969b6b2630ada68 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -540,6 +540,20 @@ _return: return TSDB_CODE_SUCCESS; } + +int32_t buildClientPolicyNodeList(SRequestObj* pRequest, SArray** pNodeList) { + *pNodeList = taosArrayInit(1, sizeof(SQueryNodeLoad)); + SQueryNodeLoad load = {0}; + load.addr.nodeId = CLIENT_HANDLE; + + taosArrayPush(*pNodeList, &load); + + tscDebug("0x%" PRIx64 " client policy", pRequest->requestId); + + return TSDB_CODE_SUCCESS; +} + + int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SMetaData* pResultMeta) { SArray* pDbVgList = NULL; SArray* pQnodeList = NULL; @@ -585,6 +599,10 @@ int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList); break; } + case QUERY_POLICY_CLIENT: { + code = buildClientPolicyNodeList(pRequest, pNodeList); + break; + } default: tscError("unknown query policy: %d", tsQueryPolicy); return TSDB_CODE_TSC_APP_ERROR; @@ -645,6 +663,10 @@ int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList); break; } + case QUERY_POLICY_CLIENT: { + code = buildClientPolicyNodeList(pRequest, pNodeList); + break; + } default: tscError("unknown query policy: %d", tsQueryPolicy); return TSDB_CODE_TSC_APP_ERROR; @@ -667,6 +689,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList .requestObjRefId = pRequest->self}; SSchedulerReq req = { .syncReq = true, + .localReq = (tsQueryPolicy == CLIENT_HANDLE), .pConn = &conn, .pNodeList = pNodeList, .pDag = pDag, @@ -1042,6 +1065,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM .pTrans = pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self}; SSchedulerReq req = { .syncReq = false, + .localReq = (tsQueryPolicy == CLIENT_HANDLE), .pConn = &conn, .pNodeList = pNodeList, .pDag = pDag, diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 30860780807a820b041e27729f8e351fb46c99b3..9c435887e6881422107f3aa8e71ea007fa38d2f9 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -26,6 +26,7 @@ #include "tref.h" #include "trpc.h" #include "version.h" +#include "qworker.h" #define TSC_VAR_NOT_RELEASE 1 #define TSC_VAR_RELEASED 0 @@ -75,6 +76,8 @@ void taos_cleanup(void) { cleanupTaskQueue(); + qWorkerDestroy(&tscQueryMgmt); + taosConvDestroy(); tscInfo("all local resources released"); diff --git a/source/dnode/mnode/impl/src/mndQuery.c b/source/dnode/mnode/impl/src/mndQuery.c index 654f46ec85682a21e8ef0009c3fcb654180c93b1..0e897de4e7bd64cdaffcd0c0f6da339ab8db4851 100644 --- a/source/dnode/mnode/impl/src/mndQuery.c +++ b/source/dnode/mnode/impl/src/mndQuery.c @@ -170,7 +170,7 @@ _exit: } int32_t mndInitQuery(SMnode *pMnode) { - if (qWorkerInit(NODE_TYPE_MNODE, MNODE_HANDLE, NULL, (void **)&pMnode->pQuery, &pMnode->msgCb) != 0) { + if (qWorkerInit(NODE_TYPE_MNODE, MNODE_HANDLE, (void **)&pMnode->pQuery, &pMnode->msgCb) != 0) { mError("failed to init qworker in mnode since %s", terrstr()); return -1; } diff --git a/source/dnode/qnode/src/qnode.c b/source/dnode/qnode/src/qnode.c index b65189153ea4f0aa36680586e472eac4007a457f..efdc8b46934f9677868312c9310cffe989e73ab1 100644 --- a/source/dnode/qnode/src/qnode.c +++ b/source/dnode/qnode/src/qnode.c @@ -26,7 +26,7 @@ SQnode *qndOpen(const SQnodeOpt *pOption) { return NULL; } - if (qWorkerInit(NODE_TYPE_QNODE, pQnode->qndId, NULL, (void **)&pQnode->pQuery, &pOption->msgCb)) { + if (qWorkerInit(NODE_TYPE_QNODE, pQnode->qndId, (void **)&pQnode->pQuery, &pOption->msgCb)) { taosMemoryFreeClear(pQnode); return NULL; } diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 8d799e919d1e4c06cfec6438d7a4a34fc336993d..1461d371810a0328b87d40102b602ab9aee8bb26 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -16,7 +16,7 @@ #include "vnd.h" int vnodeQueryOpen(SVnode *pVnode) { - return qWorkerInit(NODE_TYPE_VNODE, TD_VID(pVnode), NULL, (void **)&pVnode->pQuery, &pVnode->msgCb); + return qWorkerInit(NODE_TYPE_VNODE, TD_VID(pVnode), (void **)&pVnode->pQuery, &pVnode->msgCb); } void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); } diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 729ac474e4720a13a2da8d463820d05db72970f0..8efc2475242e6988dea14864b306d2f4e77d2e05 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -29,7 +29,7 @@ extern "C" { #include "executor.h" #include "trpc.h" -#define QW_DEFAULT_SCHEDULER_NUMBER 10000 +#define QW_DEFAULT_SCHEDULER_NUMBER 100 #define QW_DEFAULT_TASK_NUMBER 10000 #define QW_DEFAULT_SCH_TASK_NUMBER 10000 #define QW_DEFAULT_SHORT_RUN_TIMES 2 @@ -93,7 +93,7 @@ typedef struct SQWMsg { void *node; int32_t code; int32_t msgType; - char *msg; + void *msg; int32_t msgLen; SQWMsgInfo msgInfo; SRpcHandleInfo connInfo; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index f006096ce20a45e18a5b9d990c9c63b621638ac5..77c8aa5cd9d8a243b8b3085b04fa61aee60d6da7 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -512,11 +512,6 @@ int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF) { int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { int32_t code = 0; - bool queryRsped = false; - SSubplan *plan = NULL; - SQWPhaseInput input = {0}; - qTaskInfo_t pTaskInfo = NULL; - DataSinkHandle sinkHandle = NULL; SQWTaskCtx *ctx = NULL; QW_ERR_JRET(qwRegisterQueryBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo)); @@ -578,19 +573,12 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } - // QW_ERR_JRET(qwBuildAndSendQueryRsp(&qwMsg->connInfo, code)); - // QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); - - // queryRsped = true; - ctx->level = plan->level; atomic_store_ptr(&ctx->taskHandle, pTaskInfo); atomic_store_ptr(&ctx->sinkHandle, sinkHandle); - if (pTaskInfo && sinkHandle) { - qwSaveTbVersionInfo(pTaskInfo, ctx); - QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL)); - } + qwSaveTbVersionInfo(pTaskInfo, ctx); + QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL)); _return: @@ -600,11 +588,6 @@ _return: input.msgType = qwMsg->msgType; code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL); - // if (!queryRsped) { - // qwBuildAndSendQueryRsp(&qwMsg->connInfo, code); - // QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); - //} - QW_RET(TSDB_CODE_SUCCESS); } @@ -1000,8 +983,8 @@ _return: QW_RET(TSDB_CODE_SUCCESS); } -int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb) { - if (NULL == qWorkerMgmt || pMsgCb->mgmt == NULL) { +int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const SMsgCb *pMsgCb) { + if (NULL == qWorkerMgmt || (pMsgCb && pMsgCb->mgmt == NULL)) { qError("invalid param to init qworker"); QW_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -1024,22 +1007,9 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - if (cfg) { - mgmt->cfg = *cfg; - if (0 == mgmt->cfg.maxSchedulerNum) { - mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER; - } - if (0 == mgmt->cfg.maxTaskNum) { - mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER; - } - if (0 == mgmt->cfg.maxSchTaskNum) { - mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER; - } - } else { - mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER; - mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER; - mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER; - } + mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER; + mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER; + mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER; mgmt->schHash = taosHashInit(mgmt->cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); @@ -1064,7 +1034,7 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW mgmt->nodeType = nodeType; mgmt->nodeId = nodeId; - mgmt->msgCb = *pMsgCb; + mgmt->msgCb = pMsgCb ? *pMsgCb : NULL; mgmt->refId = taosAddRef(gQwMgmt.qwRef, mgmt); if (mgmt->refId < 0) { @@ -1140,3 +1110,51 @@ int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pSt return TSDB_CODE_SUCCESS; } + +int32_t qWorkerProcessLocalQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { + int32_t code = 0; + SQWTaskCtx *ctx = NULL; + SSubplan *plan = (SSubplan *)qwMsg->msg; + SQWPhaseInput input = {0}; + qTaskInfo_t pTaskInfo = NULL; + DataSinkHandle sinkHandle = NULL; + + QW_ERR_JRET(qwAddTaskCtx(QW_FPARAMS())); + + QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx)); + + QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT)); + + ctx->taskType = qwMsg->msgInfo.taskType; + ctx->explain = qwMsg->msgInfo.explain; + ctx->needFetch = qwMsg->msgInfo.needFetch; + ctx->msgType = qwMsg->msgType; + + code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH); + if (code) { + QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code)); + QW_ERR_JRET(code); + } + + if (NULL == sinkHandle || NULL == pTaskInfo) { + QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle); + QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + } + + ctx->level = plan->level; + atomic_store_ptr(&ctx->taskHandle, pTaskInfo); + atomic_store_ptr(&ctx->sinkHandle, sinkHandle); + + QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL)); + +_return: + + if (ctx) { + QW_UPDATE_RSP_CODE(ctx, code); + qwReleaseTaskCtx(mgmt, ctx); + } + + QW_RET(code); +} + + diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp index 1f76ea1e7e294090019d8f67e108b1b97cb84d6b..60d6594c1b62389147fd7f8dd7c5de0ee4b4211b 100644 --- a/source/libs/qworker/test/qworkerTests.cpp +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -877,7 +877,7 @@ TEST(seqTest, normalCase) { SMsgCb msgCb = {0}; msgCb.mgmt = (void *)mockPointer; msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue; - code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); + code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb); ASSERT_EQ(code, 0); code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0); @@ -913,7 +913,7 @@ TEST(seqTest, cancelFirst) { SMsgCb msgCb = {0}; msgCb.mgmt = (void *)mockPointer; msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue; - code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); + code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb); ASSERT_EQ(code, 0); code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0); @@ -950,7 +950,7 @@ TEST(seqTest, randCase) { SMsgCb msgCb = {0}; msgCb.mgmt = (void *)mockPointer; msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue; - code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); + code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb); ASSERT_EQ(code, 0); int32_t t = 0; @@ -1021,7 +1021,7 @@ TEST(seqTest, multithreadRand) { SMsgCb msgCb = {0}; msgCb.mgmt = (void *)mockPointer; msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue; - code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); + code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb); ASSERT_EQ(code, 0); TdThreadAttr thattr; @@ -1084,7 +1084,7 @@ TEST(rcTest, shortExecshortDelay) { SMsgCb msgCb = {0}; msgCb.mgmt = (void *)mockPointer; msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue; - code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); + code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb); ASSERT_EQ(code, 0); qwtTestMaxExecTaskUsec = 0; @@ -1168,7 +1168,7 @@ TEST(rcTest, longExecshortDelay) { SMsgCb msgCb = {0}; msgCb.mgmt = (void *)mockPointer; msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue; - code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); + code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb); ASSERT_EQ(code, 0); qwtTestMaxExecTaskUsec = 1000000; @@ -1254,7 +1254,7 @@ TEST(rcTest, shortExeclongDelay) { SMsgCb msgCb = {0}; msgCb.mgmt = (void *)mockPointer; msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue; - code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); + code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb); ASSERT_EQ(code, 0); qwtTestMaxExecTaskUsec = 0; @@ -1338,7 +1338,7 @@ TEST(rcTest, dropTest) { SMsgCb msgCb = {0}; msgCb.mgmt = (void *)mockPointer; msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue; - code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); + code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb); ASSERT_EQ(code, 0); tsem_init(&qwtTestQuerySem, 0, 0); diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 957fd46ba5a767858a3bb5bbe50142b4f1c1ce47..6292aac6d2afb71f80952fe87b79107acc7d68d2 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -151,6 +151,7 @@ typedef struct SSchedulerMgmt { SSchStat stat; SRWLatch hbLock; SHashObj *hbConnections; + void *queryMgmt; } SSchedulerMgmt; typedef struct SSchCallbackParamHeader { @@ -237,6 +238,7 @@ typedef struct SSchJobAttr { bool queryJob; bool needFetch; bool needFlowCtrl; + bool localExec; } SSchJobAttr; typedef struct { @@ -302,6 +304,7 @@ extern SSchedulerMgmt schMgmt; #define SCH_IS_DATA_BIND_QRY_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) #define SCH_IS_DATA_BIND_TASK(task) (((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) || ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY)) #define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum) +#define SCH_IS_DATA_MERGE_TASK(task) (!SCH_IS_DATA_BIND_TASK(task)) #define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st) #define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status) diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 98501427ab7b006daa78bc5d1c6c7c8d377572a0..a314ffaa3090cbf411fc2382b6c8ce2d57d85c60 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -719,6 +719,7 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) { } pJob->attr.explainMode = pReq->pDag->explainInfo.mode; + pJob->attr.localExec = pReq->localReq; pJob->conn = *pReq->pConn; if (pReq->sql) { pJob->sql = strdup(pReq->sql); diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 9cab39c30122072207daa9e9639ab92645fc1633..94b2c4d3721832a6ee703b17b9c14fe347604d53 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -228,7 +228,6 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) SCH_RET(errCode); } -// Note: no more task error processing, handled in function internal int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { bool moved = false; int32_t code = 0; @@ -819,6 +818,48 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) { return TSDB_CODE_SUCCESS; } +int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) { + SSubplan *plan = pTask->plan; + int32_t code = 0; + + if (NULL == pTask->msg) { // TODO add more detailed reason for failure + code = qSubPlanToString(plan, &pTask->msg, &pTask->msgLen); + if (TSDB_CODE_SUCCESS != code) { + SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg, + pTask->msgLen); + SCH_ERR_RET(code); + } else { + SCH_TASK_DLOGL("physical plan len:%d, %s", pTask->msgLen, pTask->msg); + } + } + + SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask)); + + if (SCH_IS_QUERY_JOB(pJob)) { + SCH_ERR_RET(schEnsureHbConnection(pJob, pTask)); + } + + SCH_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType)); +} + +int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) { + //SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask)); + if (NULL == schMgmt.queryMgmt) { + SCH_ERR_RET(qWorkerInit(NODE_TYPE_CLIENT, CLIENT_HANDLE, (void **)&schMgmt.queryMgmt, NULL)); + } + + SQWMsg qwMsg = {0}; + qwMsg.msgInfo.taskType = TASK_TYPE_TEMP; + qwMsg.msgInfo.explain = SCH_IS_EXPLAIN_JOB(pJob); + qwMsg.msgInfo.needFetch = SCH_TASK_NEED_FETCH(pTask); + qwMsg.msg = pTask->plan; + qwMsg.msgType = pTask->plan->msgType; + + SCH_ERR_RET(qWorkerProcessLocalQuery((SQWorker*)schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId, pTask->execId, &qwMsg)); + + SCH_RET(schProcessOnTaskSuccess(pJob, pTask)); +} + int32_t schLaunchTaskImpl(void *param) { SSchTaskCtx *pCtx = (SSchTaskCtx *)param; SSchJob *pJob = schAcquireJob(pCtx->jobRid); @@ -852,27 +893,12 @@ int32_t schLaunchTaskImpl(void *param) { SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC); } - SSubplan *plan = pTask->plan; - - if (NULL == pTask->msg) { // TODO add more detailed reason for failure - code = qSubPlanToString(plan, &pTask->msg, &pTask->msgLen); - if (TSDB_CODE_SUCCESS != code) { - SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg, - pTask->msgLen); - SCH_ERR_JRET(code); - } else { - SCH_TASK_DLOGL("physical plan len:%d, %s", pTask->msgLen, pTask->msg); - } - } - - SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask)); - - if (SCH_IS_QUERY_JOB(pJob)) { - SCH_ERR_JRET(schEnsureHbConnection(pJob, pTask)); + if (pJob->attr.localExec && SCH_IS_QUERY_JOB(pJob) && SCH_IS_DATA_MERGE_TASK(pTask)) { + SCH_ERR_JRET(schLaunchLocalTask(pJob, pTask)); + } else { + SCH_ERR_JRET(schLaunchRemoteTask(pJob, pTask)); } - SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType)); - _return: taosMemoryFree(param); @@ -892,7 +918,6 @@ _return: } int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { - SSchTaskCtx *param = taosMemoryCalloc(1, sizeof(SSchTaskCtx)); if (NULL == param) { SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);