diff --git a/Jenkinsfile b/Jenkinsfile index 4b84e1f88e71b3a43bc63df10edffe8a8758052a..0842795f0a496103f23fcee1c59a87c5c2bd4f3b 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -79,7 +79,7 @@ def pre_test(){ rm -rf debug mkdir debug cd debug - cmake .. > /dev/null + cmake .. -DBUILD_TEST=true > /dev/null make -j4> /dev/null ''' diff --git a/Jenkinsfile2 b/Jenkinsfile2 index b95b3ff86bf5005f486841abf3c5ce428b3d6957..ac152e2aa4605b570d6db4289c594fcb3d01aa0e 100644 --- a/Jenkinsfile2 +++ b/Jenkinsfile2 @@ -173,7 +173,7 @@ def pre_test_build_mac() { ''' sh ''' cd ${WK}/debug - cmake .. + cmake .. -DBUILD_TEST=true make -j8 ''' sh ''' @@ -302,7 +302,7 @@ def pre_test_build_win() { set CL=/MP8 echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> cmake" time /t - cmake .. -G "NMake Makefiles JOM" || exit 7 + cmake .. -G "NMake Makefiles JOM" -DBUILD_TEST=true || exit 7 echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> jom -j 6" time /t jom -j 6 || exit 8 diff --git a/cmake/cmake.options b/cmake/cmake.options index 3baccde4d711e7c7a535829c95a0ee8cdff3fae6..60ff00affc01408b084a8993441e4fe7052f4977 100644 --- a/cmake/cmake.options +++ b/cmake/cmake.options @@ -2,6 +2,12 @@ # Deps options # ========================================================= +option( + BUILD_TEST + "If build unit tests using googletest" + OFF +) + IF(${TD_WINDOWS}) MESSAGE("build pthread Win32") @@ -45,12 +51,6 @@ IF(${TD_WINDOWS}) "If build wingetopt on Windows" ON ) - - option( - BUILD_TEST - "If build unit tests using googletest" - ON - ) option( TDENGINE_3 @@ -65,28 +65,8 @@ IF(${TD_WINDOWS}) ) ELSEIF (TD_DARWIN_64) - add_definitions(-DCOMPILER_SUPPORTS_CXX13) - option( - BUILD_TEST - "If build unit tests using googletest" - ON - ) -ELSE () - include(CheckCXXCompilerFlag) - CHECK_CXX_COMPILER_FLAG("-std=c++13" COMPILER_SUPPORTS_CXX13) - IF(${COMPILER_SUPPORTS_CXX13}) + IF(${BUILD_TEST}) add_definitions(-DCOMPILER_SUPPORTS_CXX13) - option( - BUILD_TEST - "If build unit tests using googletest" - ON - ) - ELSE () - option( - BUILD_TEST - "If build unit tests using googletest" - OFF - ) ENDIF () ENDIF () diff --git a/include/common/tmsg.h b/include/common/tmsg.h index c4a4e926480ef117dd9651113e82b0fb80e1d2e1..d11fd4da10cde3117a263affd18b0c9c5bc95316 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1424,6 +1424,14 @@ typedef struct { SExplainExecInfo* subplanInfo; } SExplainRsp; +typedef struct { + SExplainRsp rsp; + uint64_t qId; + uint64_t tId; + int64_t rId; + int32_t eId; +} SExplainLocalRsp; + typedef struct STableScanAnalyzeInfo { uint64_t totalRows; uint64_t totalCheckedRows; @@ -1438,6 +1446,7 @@ typedef struct STableScanAnalyzeInfo { int32_t tSerializeSExplainRsp(void* buf, int32_t bufLen, SExplainRsp* pRsp); int32_t tDeserializeSExplainRsp(void* buf, int32_t bufLen, SExplainRsp* pRsp); +void tFreeSExplainRsp(SExplainRsp *pRsp); typedef struct { char fqdn[TSDB_FQDN_LEN]; // end point, hostname:port diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 25a6221fcb5344cd1f0d98af15840b3905321612..8c1d957381583f1e91a5c1620b90638ea3061ce1 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -29,6 +29,15 @@ typedef void* DataSinkHandle; struct SRpcMsg; struct SSubplan; +typedef int32_t (*localFetchFp)(void *, uint64_t, uint64_t, uint64_t, int64_t, int32_t, void**, SArray*); + +typedef struct { + void *handle; + bool localExec; + localFetchFp fp; + SArray *explainRes; +} SLocalFetch; + typedef struct { void* tqReader; void* meta; @@ -127,7 +136,7 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table * @param handle * @return */ -int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds); +int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SLocalFetch *pLocal); int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pBlock, uint64_t* useconds); /** diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 31d67a85a7bea917a969b43226cba48717dbe77a..a531eeffdee9dbd94318ec73c08f076311f1bb3b 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -396,6 +396,7 @@ typedef struct SDownstreamSourceNode { uint64_t schedId; int32_t execId; int32_t fetchMsgType; + bool localExec; } SDownstreamSourceNode; typedef struct SExchangePhysiNode { diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 44a9e10679c0d0d30ec743e6bf624dcd912b32e9..e9f3864f6738bc7714db49d8f2373900fe10bfb6 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -52,6 +52,7 @@ typedef enum { #define QUERY_POLICY_VNODE 1 #define QUERY_POLICY_HYBRID 2 #define QUERY_POLICY_QNODE 3 +#define QUERY_POLICY_CLIENT 4 typedef struct STableComInfo { uint8_t numOfTags; // the number of tags in schema @@ -269,43 +270,43 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t #define qFatal(...) \ do { \ if (qDebugFlag & DEBUG_FATAL) { \ - taosPrintLog("QRY FATAL ", DEBUG_FATAL, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \ + taosPrintLog("QRY FATAL ", DEBUG_FATAL, qDebugFlag, __VA_ARGS__); \ } \ } while (0) #define qError(...) \ do { \ if (qDebugFlag & DEBUG_ERROR) { \ - taosPrintLog("QRY ERROR ", DEBUG_ERROR, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \ + taosPrintLog("QRY ERROR ", DEBUG_ERROR, qDebugFlag, __VA_ARGS__); \ } \ } while (0) #define qWarn(...) \ do { \ if (qDebugFlag & DEBUG_WARN) { \ - taosPrintLog("QRY WARN ", DEBUG_WARN, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \ + taosPrintLog("QRY WARN ", DEBUG_WARN, qDebugFlag, __VA_ARGS__); \ } \ } while (0) #define qInfo(...) \ do { \ if (qDebugFlag & DEBUG_INFO) { \ - taosPrintLog("QRY ", DEBUG_INFO, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \ + taosPrintLog("QRY ", DEBUG_INFO, qDebugFlag, __VA_ARGS__); \ } \ } while (0) #define qDebug(...) \ do { \ if (qDebugFlag & DEBUG_DEBUG) { \ - taosPrintLog("QRY ", DEBUG_DEBUG, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \ + taosPrintLog("QRY ", DEBUG_DEBUG, qDebugFlag, __VA_ARGS__); \ } \ } while (0) #define qTrace(...) \ do { \ if (qDebugFlag & DEBUG_TRACE) { \ - taosPrintLog("QRY ", DEBUG_TRACE, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \ + taosPrintLog("QRY ", DEBUG_TRACE, qDebugFlag, __VA_ARGS__); \ } \ } while (0) #define qDebugL(...) \ do { \ if (qDebugFlag & DEBUG_DEBUG) { \ - taosPrintLongString("QRY ", DEBUG_DEBUG, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \ + taosPrintLongString("QRY ", DEBUG_DEBUG, qDebugFlag, __VA_ARGS__); \ } \ } while (0) diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index 87aefe5187ec7ca61a4de5f6f14adbbf26861dfc..99f51892284bb4eb9884b763bef051771d39e1b7 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -29,6 +29,7 @@ enum { NODE_TYPE_QNODE, NODE_TYPE_SNODE, NODE_TYPE_MNODE, + NODE_TYPE_CLIENT, }; typedef struct SQWorkerCfg { @@ -55,7 +56,24 @@ typedef struct { uint64_t numOfErrors; } SQWorkerStat; -int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb); +typedef struct SQWMsgInfo { + int8_t taskType; + int8_t explain; + int8_t needFetch; +} SQWMsgInfo; + +typedef struct SQWMsg { + void *node; + int32_t code; + int32_t msgType; + void *msg; + int32_t msgLen; + SQWMsgInfo msgInfo; + SRpcHandleInfo connInfo; +} SQWMsg; + + +int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const SMsgCb *pMsgCb); int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg); @@ -77,10 +95,14 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_ int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SDeleteRes *pRes); -void qWorkerDestroy(void **qWorkerMgmt); +void qWorkerDestroy(void **qWorkerMgmt); int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pStat); +int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, SQWMsg *qwMsg, SArray *explainRes); + +int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, void** pRsp, SArray* explainRes); + #ifdef __cplusplus } #endif diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 738d057e6ac531e726cd069eaf0de35bdc15c365..077c23c1b580bb693e0ec581920b9ebea46de751 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -64,6 +64,7 @@ typedef bool (*schedulerChkKillFp)(void* param); typedef struct SSchedulerReq { bool syncReq; + bool localReq; SRequestConnInfo *pConn; SArray *pNodeList; SQueryPlan *pDag; diff --git a/include/util/tdef.h b/include/util/tdef.h index 0c7c2ea3a31cfce495230a6ce5f303c49108ffe4..43fd31afa7cc634e473c93518b78776c182d48d4 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -483,6 +483,7 @@ enum { #define SNODE_HANDLE -2 #define VNODE_HANDLE -3 #define BNODE_HANDLE -4 +#define CLIENT_HANDLE -5 #define TSDB_CONFIG_OPTION_LEN 32 #define TSDB_CONFIG_VALUE_LEN 64 diff --git a/packaging/docker/dockerManifest.sh b/packaging/docker/dockerManifest.sh index 71788423f6e58b2788346ef2804cd4d03ee54b02..8f71e30fbdca1cc9adf8e9b46c652475822e4b08 100755 --- a/packaging/docker/dockerManifest.sh +++ b/packaging/docker/dockerManifest.sh @@ -1,6 +1,7 @@ #!/bin/bash set -e #set -x +set -v # dockerbuild.sh # -n [version number] @@ -11,8 +12,9 @@ set -e version="" passWord="" verType="" +dockerLatest="n" -while getopts "hn:p:V:" arg +while getopts "hn:p:V:a:" arg do case $arg in n) @@ -29,9 +31,15 @@ do ;; h) echo "Usage: `basename $0` -n [version number] " - echo " -p [password for docker hub] " + echo " -p [password for docker hub] " + echo " -V [stable |beta] " + echo " -a [y | n ] " exit 0 ;; + a) + #echo "dockerLatest=$OPTARG" + dockerLatest=$(echo $OPTARG) + ;; ?) #unknow option echo "unkonw argument" exit 1 @@ -41,42 +49,55 @@ done echo "version=${version}" -#docker manifest rm tdengine/tdengine -#docker manifest rm tdengine/tdengine:${version} -if [ "$verType" == "beta" ]; then - docker manifest create -a tdengine/tdengine-beta:${version} tdengine/tdengine-amd64-beta:${version} tdengine/tdengine-aarch64-beta:${version} tdengine/tdengine-aarch32-beta:${version} - docker manifest create -a tdengine/tdengine-beta:latest tdengine/tdengine-amd64-beta:latest tdengine/tdengine-aarch64-beta:latest tdengine/tdengine-aarch32-beta:latest - docker manifest rm tdengine/tdengine-beta:${version} - docker manifest rm tdengine/tdengine-beta:latest - docker manifest create -a tdengine/tdengine-beta:${version} tdengine/tdengine-amd64-beta:${version} tdengine/tdengine-aarch64-beta:${version} tdengine/tdengine-aarch32-beta:${version} - docker manifest create -a tdengine/tdengine-beta:latest tdengine/tdengine-amd64-beta:latest tdengine/tdengine-aarch64-beta:latest tdengine/tdengine-aarch32-beta:latest - docker manifest inspect tdengine/tdengine:latest - docker manifest inspect tdengine/tdengine:${version} - docker login -u tdengine -p ${passWord} #replace the docker registry username and password - docker manifest push tdengine/tdengine-beta:${version} - docker manifest push tdengine/tdengine-beta:latest -elif [ "$verType" == "stable" ]; then - docker manifest create -a tdengine/tdengine:${version} tdengine/tdengine-amd64:${version} tdengine/tdengine-aarch64:${version} tdengine/tdengine-aarch32:${version} - docker manifest create -a tdengine/tdengine:latest tdengine/tdengine-amd64:latest tdengine/tdengine-aarch64:latest tdengine/tdengine-aarch32:latest - docker manifest rm tdengine/tdengine:latest - docker manifest rm tdengine/tdengine:${version} - docker manifest create -a tdengine/tdengine:${version} tdengine/tdengine-amd64:${version} tdengine/tdengine-aarch64:${version} tdengine/tdengine-aarch32:${version} - docker manifest create -a tdengine/tdengine:latest tdengine/tdengine-amd64:latest tdengine/tdengine-aarch64:latest tdengine/tdengine-aarch32:latest - docker manifest inspect tdengine/tdengine:latest - docker manifest inspect tdengine/tdengine:${version} - docker login -u tdengine -p ${passWord} #replace the docker registry username and password - docker manifest push tdengine/tdengine:${version} - docker manifest push tdengine/tdengine:latest -else +if [ "$verType" == "stable" ]; then + verType=stable + dockerinput=TDengine-server-${version}-Linux-$cpuType.tar.gz + dockerinput_x64=TDengine-server-${version}-Linux-amd64.tar.gz + dockerim=tdengine/tdengine + dockeramd64=tdengine/tdengine-amd64 + dockeraarch64=tdengine/tdengine-aarch64 + dockeraarch32=tdengine/tdengine-aarch32 +elif [ "$verType" == "beta" ];then + verType=beta + tagVal=ver-${version}-beta + dockerinput=TDengine-server-${version}-${verType}-Linux-$cpuType.tar.gz + dockerinput_x64=TDengine-server-${version}-${verType}-Linux-amd64.tar.gz + dockerim=tdengine/tdengine-beta + dockeramd64=tdengine/tdengine-amd64-beta + dockeraarch64=tdengine/tdengine-aarch64-beta + dockeraarch32=tdengine/tdengine-aarch32-beta + else echo "unknow verType, nor stabel or beta" exit 1 fi -# docker manifest create -a tdengine/${dockername}:${version} tdengine/tdengine-amd64:${version} tdengine/tdengine-aarch64:${version} tdengine/tdengine-aarch32:${version} -# docker manifest create -a tdengine/${dockername}:latest tdengine/tdengine-amd64:latest tdengine/tdengine-aarch64:latest tdengine/tdengine-aarch32:latest +username="tdengine" -# docker login -u tdengine -p ${passWord} #replace the docker registry username and password +# generate docker verison +echo "generate ${dockerim}:${version}" +docker manifest create -a ${dockerim}:${version} ${dockeramd64}:${version} ${dockeraarch64}:${version} +docker manifest inspect ${dockerim}:${version} +docker manifest rm ${dockerim}:${version} +docker manifest create -a ${dockerim}:${version} ${dockeramd64}:${version} ${dockeraarch64}:${version} +docker manifest inspect ${dockerim}:${version} +docker login -u ${username} -p ${passWord} +docker manifest push ${dockerim}:${version} + + +# generate docker latest +echo "generate ${dockerim}:latest " + +if [ ${dockerLatest} == 'y' ] ;then + echo "docker manifest create -a ${dockerim}:latest ${dockeramd64}:latest ${dockeraarch64}:latest" + docker manifest create -a ${dockerim}:latest ${dockeramd64}:latest ${dockeraarch64}:latest + docker manifest inspect ${dockerim}:latest + docker manifest rm ${dockerim}:latest + docker manifest create -a ${dockerim}:latest ${dockeramd64}:latest ${dockeraarch64}:latest + docker manifest inspect ${dockerim}:latest + docker login -u tdengine -p ${passWord} #replace the docker registry username and password + docker manifest push ${dockerim}:latest + docker pull tdengine/tdengine:latest + +fi -# docker manifest push tdengine/tdengine:latest -# # how set latest version ??? diff --git a/packaging/docker/dockerbuild.sh b/packaging/docker/dockerbuild.sh index 541ae6ec1398ae40a450382d25aa53bec18a8ced..8b0b0c190c5acd9352ddc2699db3375e18d5fb9c 100755 --- a/packaging/docker/dockerbuild.sh +++ b/packaging/docker/dockerbuild.sh @@ -149,26 +149,4 @@ rm -rf temp1.data if [ ${dockerLatest} == 'y' ] ;then docker tag tdengine/tdengine-${dockername}:${version} tdengine/tdengine-${dockername}:latest docker push tdengine/tdengine-${dockername}:latest - echo ">>>>>>>>>>>>> check whether tdengine/tdengine-${dockername}:latest has been published correctly" - docker run -d --name doctestla -p 7030-7049:6030-6049 -p 7030-7049:6030-6049/udp tdengine/tdengine-${dockername}:latest - sleep 2 - curl -u root:taosdata -d 'show variables;' 127.0.0.1:7041/rest/sql > temp2.data - version_latest=` cat temp2.data |jq .data| jq '.[]' |grep "version" -A 2 -B 1 | jq ".[1]" ` - echo "${version_latest}" - if [ "${version_latest}" == "\"${version}\"" ] ; then - echo "docker version is right " - else - echo "docker version is wrong " - exit 1 - fi fi -rm -rf temp2.data - -if [ -n "$(docker ps -aq)" ] ;then - echo "delte docker process" - docker stop $(docker ps -aq) - docker rm $(docker ps -aq) -fi - -cd ${scriptDir} -rm -f ${pkgFile} diff --git a/packaging/release.bat b/packaging/release.bat index 591227382f9cec4a2fa1308a9b827994430f7236..b87ae68e2b2f8b0507992206af6ed482e5a9392c 100644 --- a/packaging/release.bat +++ b/packaging/release.bat @@ -44,8 +44,6 @@ cmake ../../ -G "NMake Makefiles JOM" -DCMAKE_MAKE_PROGRAM=jom -DBUILD_TOOLS=tru cmake --build . rd /s /Q C:\TDengine cmake --install . -for /r c:\TDengine %%i in (*.dll) do signtool sign /f D:\\123.pfx /p taosdata %%i -for /r c:\TDengine %%i in (*.exe) do signtool sign /f D:\\123.pfx /p taosdata %%i if not %errorlevel% == 0 ( call :RUNFAILED build x64 failed & exit /b 1) cd %package_dir% iscc /DMyAppInstallName="%packagServerName_x64%" /DMyAppVersion="%2" /DMyAppExcludeSource="" tools\tdengine.iss /O..\release @@ -53,7 +51,6 @@ if not %errorlevel% == 0 ( call :RUNFAILED package %packagServerName_x64% faile iscc /DMyAppInstallName="%packagClientName_x64%" /DMyAppVersion="%2" /DMyAppExcludeSource="taosd.exe" tools\tdengine.iss /O..\release if not %errorlevel% == 0 ( call :RUNFAILED package %packagClientName_x64% failed & exit /b 1) -for /r ..\release %%i in (*.exe) do signtool sign /f d:\\123.pfx /p taosdata %%i goto EXIT0 :USAGE diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 624abe6da5f5b9c1f6133defdc3d83c672479da4..6a4ef3d821ce4180e40b48d58f423477997628e7 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -27,6 +27,7 @@ #include "trpc.h" #include "tsched.h" #include "ttime.h" +#include "qworker.h" #define TSC_VAR_NOT_RELEASE 1 #define TSC_VAR_RELEASED 0 diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 034f78e5f6ca1806406eb05d631441030856771b..be907ea1e27cadb391f3010a3ba2e19735a83055 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -379,7 +379,7 @@ int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) { } bool qnodeRequired(SRequestObj* pRequest) { - if (QUERY_POLICY_VNODE == tsQueryPolicy) { + if (QUERY_POLICY_VNODE == tsQueryPolicy || QUERY_POLICY_CLIENT == tsQueryPolicy) { return false; } @@ -483,7 +483,8 @@ void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) { int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pDbVgList) { SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad)); - + char *policy = (tsQueryPolicy == QUERY_POLICY_VNODE) ? "vnode" : "client"; + int32_t dbNum = taosArrayGetSize(pDbVgList); for (int32_t i = 0; i < dbNum; ++i) { SArray* pVg = taosArrayGetP(pDbVgList, i); @@ -504,20 +505,20 @@ int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArr int32_t vnodeNum = taosArrayGetSize(nodeList); if (vnodeNum > 0) { - tscDebug("0x%" PRIx64 " vnode policy, use vnode list, num:%d", pRequest->requestId, vnodeNum); + tscDebug("0x%" PRIx64 " %s policy, use vnode list, num:%d", pRequest->requestId, policy, vnodeNum); goto _return; } int32_t mnodeNum = taosArrayGetSize(pMnodeList); if (mnodeNum <= 0) { - tscDebug("0x%" PRIx64 " vnode policy, empty node list", pRequest->requestId); + tscDebug("0x%" PRIx64 " %s policy, empty node list", pRequest->requestId, policy); goto _return; } void* pData = taosArrayGet(pMnodeList, 0); taosArrayAddBatch(nodeList, pData, mnodeNum); - tscDebug("0x%" PRIx64 " vnode policy, use mnode list, num:%d", pRequest->requestId, mnodeNum); + tscDebug("0x%" PRIx64 " %s policy, use mnode list, num:%d", pRequest->requestId, policy, mnodeNum); _return: @@ -561,7 +562,8 @@ int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray int32_t code = 0; switch (tsQueryPolicy) { - case QUERY_POLICY_VNODE: { + case QUERY_POLICY_VNODE: + case QUERY_POLICY_CLIENT: { if (pResultMeta) { pDbVgList = taosArrayInit(4, POINTER_BYTES); @@ -622,7 +624,8 @@ int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* int32_t code = 0; switch (tsQueryPolicy) { - case QUERY_POLICY_VNODE: { + case QUERY_POLICY_VNODE: + case QUERY_POLICY_CLIENT: { int32_t dbNum = taosArrayGetSize(pRequest->dbList); if (dbNum > 0) { SCatalog* pCtg = NULL; @@ -682,6 +685,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList .requestObjRefId = pRequest->self}; SSchedulerReq req = { .syncReq = true, + .localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT), .pConn = &conn, .pNodeList = pNodeList, .pDag = pDag, @@ -1061,6 +1065,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM .pTrans = pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self}; SSchedulerReq req = { .syncReq = false, + .localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT), .pConn = &conn, .pNodeList = pNodeList, .pDag = pDag, diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 831212bdee980f66363f68778a5ddd98a475923c..650b16f8553a88b662f00ab5dba2951ad9ef2984 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -26,6 +26,7 @@ #include "tref.h" #include "trpc.h" #include "version.h" +#include "qworker.h" #define TSC_VAR_NOT_RELEASE 1 #define TSC_VAR_RELEASED 0 diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 09f5d4b4b2c60daf8b7f11026e07a988703d6fbc..e2e59108aa924b883e451af3aebc2282bd4dc6c7 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -285,7 +285,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "shellActivityTimer", tsShellActivityTimer, 1, 120, 1) != 0) return -1; if (cfgAddInt32(pCfg, "compressMsgSize", tsCompressMsgSize, -1, 100000000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "compressColData", tsCompressColData, -1, 100000000, 1) != 0) return -1; - if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 3, 1) != 0) return -1; + if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 4, 1) != 0) return -1; if (cfgAddInt32(pCfg, "querySmaOptimize", tsQuerySmaOptimize, 0, 1, 1) != 0) return -1; if (cfgAddBool(pCfg, "queryPlannerTrace", tsQueryPlannerTrace, true) != 0) return -1; if (cfgAddInt32(pCfg, "queryNodeChunkSize", tsQueryNodeChunkSize, 1024, 128 * 1024, true) != 0) return -1; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 09908150248c7ad1f098d4751ea5fee655ad078f..e4f3a89272fee0e2f54324f3d1c45193ea630bbf 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4365,7 +4365,7 @@ int32_t tDeserializeSExplainRsp(void *buf, int32_t bufLen, SExplainRsp *pRsp) { if (tStartDecode(&decoder) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->numOfPlans) < 0) return -1; if (pRsp->numOfPlans > 0) { - pRsp->subplanInfo = taosMemoryMalloc(pRsp->numOfPlans * sizeof(SExplainExecInfo)); + pRsp->subplanInfo = taosMemoryCalloc(pRsp->numOfPlans, sizeof(SExplainExecInfo)); if (pRsp->subplanInfo == NULL) return -1; } for (int32_t i = 0; i < pRsp->numOfPlans; ++i) { @@ -4373,7 +4373,7 @@ int32_t tDeserializeSExplainRsp(void *buf, int32_t bufLen, SExplainRsp *pRsp) { if (tDecodeDouble(&decoder, &pRsp->subplanInfo[i].totalCost) < 0) return -1; if (tDecodeU64(&decoder, &pRsp->subplanInfo[i].numOfRows) < 0) return -1; if (tDecodeU32(&decoder, &pRsp->subplanInfo[i].verboseLen) < 0) return -1; - if (tDecodeBinary(&decoder, (uint8_t **)&pRsp->subplanInfo[i].verboseInfo, &pRsp->subplanInfo[i].verboseLen) < 0) + if (tDecodeBinaryAlloc(&decoder, &pRsp->subplanInfo[i].verboseInfo, NULL) < 0) return -1; } @@ -4383,6 +4383,19 @@ int32_t tDeserializeSExplainRsp(void *buf, int32_t bufLen, SExplainRsp *pRsp) { return 0; } +void tFreeSExplainRsp(SExplainRsp *pRsp) { + if (NULL == pRsp) { + return; + } + + for (int32_t i = 0; i < pRsp->numOfPlans; ++i) { + SExplainExecInfo *pExec = pRsp->subplanInfo + i; + taosMemoryFree(pExec->verboseInfo); + } + + taosMemoryFreeClear(pRsp->subplanInfo); +} + int32_t tSerializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq) { int32_t headLen = sizeof(SMsgHead); if (buf != NULL) { diff --git a/source/dnode/mnode/impl/src/mndQuery.c b/source/dnode/mnode/impl/src/mndQuery.c index 654f46ec85682a21e8ef0009c3fcb654180c93b1..0e897de4e7bd64cdaffcd0c0f6da339ab8db4851 100644 --- a/source/dnode/mnode/impl/src/mndQuery.c +++ b/source/dnode/mnode/impl/src/mndQuery.c @@ -170,7 +170,7 @@ _exit: } int32_t mndInitQuery(SMnode *pMnode) { - if (qWorkerInit(NODE_TYPE_MNODE, MNODE_HANDLE, NULL, (void **)&pMnode->pQuery, &pMnode->msgCb) != 0) { + if (qWorkerInit(NODE_TYPE_MNODE, MNODE_HANDLE, (void **)&pMnode->pQuery, &pMnode->msgCb) != 0) { mError("failed to init qworker in mnode since %s", terrstr()); return -1; } diff --git a/source/dnode/qnode/src/qnode.c b/source/dnode/qnode/src/qnode.c index b65189153ea4f0aa36680586e472eac4007a457f..efdc8b46934f9677868312c9310cffe989e73ab1 100644 --- a/source/dnode/qnode/src/qnode.c +++ b/source/dnode/qnode/src/qnode.c @@ -26,7 +26,7 @@ SQnode *qndOpen(const SQnodeOpt *pOption) { return NULL; } - if (qWorkerInit(NODE_TYPE_QNODE, pQnode->qndId, NULL, (void **)&pQnode->pQuery, &pOption->msgCb)) { + if (qWorkerInit(NODE_TYPE_QNODE, pQnode->qndId, (void **)&pQnode->pQuery, &pOption->msgCb)) { taosMemoryFreeClear(pQnode); return NULL; } diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index c6ec31cc21e94bcf6db086d047297302f30fffc7..8d1525e081c1224394adf64c7d7ba8e5a6338cc7 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -690,7 +690,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma while (1) { uint64_t ts; - int32_t code = qExecTaskOpt(taskInfo, pResList, &ts); + int32_t code = qExecTaskOpt(taskInfo, pResList, &ts, NULL); if (code < 0) { if (code == TSDB_CODE_QRY_IN_EXEC) { break; diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 8cfe1d8adfddb675856238501977fb9d9d8aee6e..825bf8a0a7c0adbf5a19c08c2e2418da3b6e571a 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -16,7 +16,7 @@ #include "vnd.h" int vnodeQueryOpen(SVnode *pVnode) { - return qWorkerInit(NODE_TYPE_VNODE, TD_VID(pVnode), NULL, (void **)&pVnode->pQuery, &pVnode->msgCb); + return qWorkerInit(NODE_TYPE_VNODE, TD_VID(pVnode), (void **)&pVnode->pQuery, &pVnode->msgCb); } void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); } diff --git a/source/libs/command/inc/commandInt.h b/source/libs/command/inc/commandInt.h index 706985f89468455b49876ac8d20edef634c42ff9..ad677a81ff304a3637c9a24c2c20b4f03cee3ac1 100644 --- a/source/libs/command/inc/commandInt.h +++ b/source/libs/command/inc/commandInt.h @@ -99,8 +99,10 @@ extern "C" { typedef struct SExplainGroup { int32_t nodeNum; + int32_t nodeIdx; int32_t physiPlanExecNum; int32_t physiPlanExecIdx; + bool singleChannel; SRWLatch lock; SSubplan *plan; SArray *nodeExecInfo; //Array diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index a1a8ed66e63261d9e32a0999e0c8e2570e6180dc..c5de44e1f67d3e4843a0357be88bbac730fc2d99 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -21,14 +21,14 @@ #include "tdatablock.h" int32_t qExplainGenerateResNode(SPhysiNode *pNode, SExplainGroup *group, SExplainResNode **pRes); -int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level); +int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level, bool singleChannel); void qExplainFreeResNode(SExplainResNode *resNode) { if (NULL == resNode) { return; } - taosMemoryFreeClear(resNode->pExecInfo); + taosArrayDestroy(resNode->pExecInfo); SNode *node = NULL; FOREACH(node, resNode->pChildren) { qExplainFreeResNode((SExplainResNode *)node); } @@ -56,8 +56,9 @@ void qExplainFreeCtx(SExplainCtx *pCtx) { int32_t num = taosArrayGetSize(group->nodeExecInfo); for (int32_t i = 0; i < num; ++i) { SExplainRsp *rsp = taosArrayGet(group->nodeExecInfo, i); - taosMemoryFreeClear(rsp->subplanInfo); + tFreeSExplainRsp(rsp); } + taosArrayDestroy(group->nodeExecInfo); } pIter = taosHashIterate(pCtx->groupHash, pIter); @@ -66,6 +67,7 @@ void qExplainFreeCtx(SExplainCtx *pCtx) { taosHashCleanup(pCtx->groupHash); taosArrayDestroy(pCtx->rows); + taosMemoryFreeClear(pCtx->tbuf); taosMemoryFree(pCtx); } @@ -248,7 +250,7 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo return TSDB_CODE_SUCCESS; } -int32_t qExplainGenerateResNodeExecInfo(SArray **pExecInfo, SExplainGroup *group) { +int32_t qExplainGenerateResNodeExecInfo(SPhysiNode *pNode, SArray **pExecInfo, SExplainGroup *group) { *pExecInfo = taosArrayInit(group->nodeNum, sizeof(SExplainExecInfo)); if (NULL == (*pExecInfo)) { qError("taosArrayInit %d explainExecInfo failed", group->nodeNum); @@ -256,17 +258,28 @@ int32_t qExplainGenerateResNodeExecInfo(SArray **pExecInfo, SExplainGroup *group } SExplainRsp *rsp = NULL; - for (int32_t i = 0; i < group->nodeNum; ++i) { - rsp = taosArrayGet(group->nodeExecInfo, i); - /* - if (group->physiPlanExecIdx >= rsp->numOfPlans) { - qError("physiPlanIdx %d exceed plan num %d", group->physiPlanExecIdx, rsp->numOfPlans); - return TSDB_CODE_QRY_APP_ERROR; - } + if (group->singleChannel) { + if (0 == group->physiPlanExecIdx) { + group->nodeIdx = 0; + } + + rsp = taosArrayGet(group->nodeExecInfo, group->nodeIdx++); + if (group->physiPlanExecIdx >= rsp->numOfPlans) { + qError("physiPlanIdx %d exceed plan num %d", group->physiPlanExecIdx, rsp->numOfPlans); + return TSDB_CODE_QRY_APP_ERROR; + } + + taosArrayPush(*pExecInfo, rsp->subplanInfo + group->physiPlanExecIdx); + } else { + for (int32_t i = 0; i < group->nodeNum; ++i) { + rsp = taosArrayGet(group->nodeExecInfo, i); + if (group->physiPlanExecIdx >= rsp->numOfPlans) { + qError("physiPlanIdx %d exceed plan num %d", group->physiPlanExecIdx, rsp->numOfPlans); + return TSDB_CODE_QRY_APP_ERROR; + } - taosArrayPush(*pExecInfo, rsp->subplanInfo + group->physiPlanExecIdx); - */ - taosArrayPush(*pExecInfo, rsp->subplanInfo); + taosArrayPush(*pExecInfo, rsp->subplanInfo + group->physiPlanExecIdx); + } } ++group->physiPlanExecIdx; @@ -291,7 +304,7 @@ int32_t qExplainGenerateResNode(SPhysiNode *pNode, SExplainGroup *group, SExplai resNode->pNode = pNode; if (group->nodeExecInfo) { - QRY_ERR_JRET(qExplainGenerateResNodeExecInfo(&resNode->pExecInfo, group)); + QRY_ERR_JRET(qExplainGenerateResNodeExecInfo(pNode, &resNode->pExecInfo, group)); } QRY_ERR_JRET(qExplainGenerateResChildren(pNode, group, &resNode->pChildren)); @@ -801,7 +814,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i } } - QRY_ERR_RET(qExplainAppendGroupResRows(ctx, pExchNode->srcStartGroupId, level + 1)); + QRY_ERR_RET(qExplainAppendGroupResRows(ctx, pExchNode->srcStartGroupId, level + 1, pExchNode->singleChannel)); break; } case QUERY_NODE_PHYSICAL_PLAN_SORT: { @@ -1533,7 +1546,7 @@ int32_t qExplainResNodeToRows(SExplainResNode *pResNode, SExplainCtx *ctx, int32 return TSDB_CODE_SUCCESS; } -int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level) { +int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level, bool singleChannel) { SExplainResNode *node = NULL; int32_t code = 0; SExplainCtx *ctx = (SExplainCtx *)pCtx; @@ -1544,6 +1557,9 @@ int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level) { QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } + group->singleChannel = singleChannel; + group->physiPlanExecIdx = 0; + QRY_ERR_RET(qExplainGenerateResNode(group->plan->pNode, group, &node)); QRY_ERR_JRET(qExplainResNodeToRows(node, ctx, level)); @@ -1707,7 +1723,7 @@ int32_t qExplainAppendPlanRows(SExplainCtx *pCtx) { } int32_t qExplainGenerateRsp(SExplainCtx *pCtx, SRetrieveTableRsp **pRsp) { - QRY_ERR_RET(qExplainAppendGroupResRows(pCtx, pCtx->rootGroupId, 0)); + QRY_ERR_RET(qExplainAppendGroupResRows(pCtx, pCtx->rootGroupId, 0, false)); QRY_ERR_RET(qExplainAppendPlanRows(pCtx)); QRY_ERR_RET(qExplainGetRspFromCtx(pCtx, pRsp)); @@ -1723,7 +1739,7 @@ int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t SExplainGroup *group = taosHashGet(ctx->groupHash, &groupId, sizeof(groupId)); if (NULL == group) { qError("group %d not in groupHash", groupId); - taosMemoryFreeClear(pRspMsg->subplanInfo); + tFreeSExplainRsp(pRspMsg); QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } @@ -1732,7 +1748,7 @@ int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t group->nodeExecInfo = taosArrayInit(group->nodeNum, sizeof(SExplainRsp)); if (NULL == group->nodeExecInfo) { qError("taosArrayInit %d explainExecInfo failed", group->nodeNum); - taosMemoryFreeClear(pRspMsg->subplanInfo); + tFreeSExplainRsp(pRspMsg); taosWUnLockLatch(&group->lock); QRY_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -1742,7 +1758,7 @@ int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t } else if (taosArrayGetSize(group->nodeExecInfo) >= group->nodeNum) { qError("group execInfo already full, size:%d, nodeNum:%d", (int32_t)taosArrayGetSize(group->nodeExecInfo), group->nodeNum); - taosMemoryFreeClear(pRspMsg->subplanInfo); + tFreeSExplainRsp(pRspMsg); taosWUnLockLatch(&group->lock); QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR); @@ -1751,13 +1767,14 @@ int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t if (group->physiPlanExecNum != pRspMsg->numOfPlans) { qError("physiPlanExecNum %d mismatch with others %d in group %d", pRspMsg->numOfPlans, group->physiPlanExecNum, groupId); - taosMemoryFreeClear(pRspMsg->subplanInfo); + tFreeSExplainRsp(pRspMsg); taosWUnLockLatch(&group->lock); QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } taosArrayPush(group->nodeExecInfo, pRspMsg); + groupDone = (taosArrayGetSize(group->nodeExecInfo) >= group->nodeNum); taosWUnLockLatch(&group->lock); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 6484b6cb31b3e10d67f62812e20c76091c35c02f..dc98b0547957ab910afb6b4d465c020cfbd60799 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -183,6 +183,7 @@ typedef struct SExecTaskInfo { EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] SSubplan* pSubplan; struct SOperatorInfo* pRoot; + SLocalFetch localFetch; } SExecTaskInfo; enum { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index be8bdf73081b7db31fd0168be42fbf10afd63337..373cb451f4f73fc0d2f6245f9ea9af06617ea59f 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -479,10 +479,14 @@ static void freeBlock(void* param) { blockDataDestroy(pBlock); } -int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds) { +int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SLocalFetch* pLocal) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; int64_t threadId = taosGetSelfPthreadId(); + if (pLocal) { + memcpy(&pTaskInfo->localFetch, pLocal, sizeof(*pLocal)); + } + taosArrayClearEx(pResList, freeBlock); int64_t curOwner = 0; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 31a7b0facfea798b40d11b7b6670c59057d4cb05..7fcd41dc3e10d77ccba6ef15f7420b78190c4948 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1776,49 +1776,57 @@ void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) { size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); - SResFetchReq* pMsg = taosMemoryCalloc(1, sizeof(SResFetchReq)); - if (NULL == pMsg) { - pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; - return pTaskInfo->code; - } - SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, sourceIndex); SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex); ASSERT(pDataInfo->status == EX_SOURCE_DATA_NOT_READY); - qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %d/%" PRIzu, - GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, pSource->execId, - sourceIndex, totalSources); - - pMsg->header.vgId = htonl(pSource->addr.nodeId); - pMsg->sId = htobe64(pSource->schedId); - pMsg->taskId = htobe64(pSource->taskId); - pMsg->queryId = htobe64(pTaskInfo->id.queryId); - pMsg->execId = htonl(pSource->execId); - - // send the fetch remote task result reques - SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); - if (NULL == pMsgSendInfo) { - taosMemoryFreeClear(pMsg); - qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo)); - pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; - return pTaskInfo->code; - } - SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper)); pWrapper->exchangeId = pExchangeInfo->self; pWrapper->sourceIndex = sourceIndex; - pMsgSendInfo->param = pWrapper; - pMsgSendInfo->paramFreeFp = taosMemoryFree; - pMsgSendInfo->msgInfo.pData = pMsg; - pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq); - pMsgSendInfo->msgType = pSource->fetchMsgType; - pMsgSendInfo->fp = loadRemoteDataCallback; + if (pSource->localExec) { + SDataBuf pBuf = {0}; + int32_t code = (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->schedId, pTaskInfo->id.queryId, pSource->taskId, 0, pSource->execId, &pBuf.pData, pTaskInfo->localFetch.explainRes); + loadRemoteDataCallback(pWrapper, &pBuf, code); + taosMemoryFree(pWrapper); + } else { + SResFetchReq* pMsg = taosMemoryCalloc(1, sizeof(SResFetchReq)); + if (NULL == pMsg) { + pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; + return pTaskInfo->code; + } + + qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %d/%" PRIzu, + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, pSource->execId, + sourceIndex, totalSources); + + pMsg->header.vgId = htonl(pSource->addr.nodeId); + pMsg->sId = htobe64(pSource->schedId); + pMsg->taskId = htobe64(pSource->taskId); + pMsg->queryId = htobe64(pTaskInfo->id.queryId); + pMsg->execId = htonl(pSource->execId); + + // send the fetch remote task result reques + SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); + if (NULL == pMsgSendInfo) { + taosMemoryFreeClear(pMsg); + qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo)); + pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; + return pTaskInfo->code; + } + + pMsgSendInfo->param = pWrapper; + pMsgSendInfo->paramFreeFp = taosMemoryFree; + pMsgSendInfo->msgInfo.pData = pMsg; + pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq); + pMsgSendInfo->msgType = pSource->fetchMsgType; + pMsgSendInfo->fp = loadRemoteDataCallback; - int64_t transporterId = 0; - int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo); + int64_t transporterId = 0; + int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo); + } + return TSDB_CODE_SUCCESS; } @@ -3531,7 +3539,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo STableScanInfo* pScanInfo = pOperator->info; pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) { - pOperator = createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo); + pOperator = createExchangeOperatorInfo(pHandle ? pHandle->pMsgCb->clientRpc : NULL, (SExchangePhysiNode*)pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; if (pHandle->vnode) { @@ -4047,8 +4055,10 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) { cleanupTableSchemaInfo(&pTaskInfo->schemaInfo); cleanupStreamInfo(&pTaskInfo->streamInfo); - nodesDestroyNode((SNode*)pTaskInfo->pSubplan); - + if (!pTaskInfo->localFetch.localExec) { + nodesDestroyNode((SNode*)pTaskInfo->pSubplan); + } + taosMemoryFreeClear(pTaskInfo->sql); taosMemoryFreeClear(pTaskInfo->id.str); taosMemoryFreeClear(pTaskInfo); diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index dbe08bb7b2a37f16ad3c0590ae21cfaccf4794cf..c145952778aa17fb660a4732e2c0d6c068de521a 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -627,6 +627,7 @@ static int32_t downstreamSourceCopy(const SDownstreamSourceNode* pSrc, SDownstre COPY_SCALAR_FIELD(schedId); COPY_SCALAR_FIELD(execId); COPY_SCALAR_FIELD(fetchMsgType); + COPY_SCALAR_FIELD(localExec); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 0cb1bbe6c3ad8c0c5a82cc82cf324aecce786944..3a3226359998d23da690aa361c5dffd1ce7d12fc 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -29,7 +29,7 @@ extern "C" { #include "executor.h" #include "trpc.h" -#define QW_DEFAULT_SCHEDULER_NUMBER 10000 +#define QW_DEFAULT_SCHEDULER_NUMBER 100 #define QW_DEFAULT_TASK_NUMBER 10000 #define QW_DEFAULT_SCH_TASK_NUMBER 10000 #define QW_DEFAULT_SHORT_RUN_TIMES 2 @@ -83,22 +83,6 @@ typedef struct SQWDebug { extern SQWDebug gQWDebug; -typedef struct SQWMsgInfo { - int8_t taskType; - int8_t explain; - int8_t needFetch; -} SQWMsgInfo; - -typedef struct SQWMsg { - void *node; - int32_t code; - int32_t msgType; - char *msg; - int32_t msgLen; - SQWMsgInfo msgInfo; - SRpcHandleInfo connInfo; -} SQWMsg; - typedef struct SQWHbParam { bool inUse; int32_t qwrId; @@ -133,6 +117,7 @@ typedef struct SQWTaskCtx { int8_t taskType; int8_t explain; int8_t needFetch; + int8_t localExec; int32_t msgType; int32_t fetchType; int32_t execId; @@ -150,6 +135,7 @@ typedef struct SQWTaskCtx { int8_t events[QW_EVENT_MAX]; + SArray *explainRes; void *taskHandle; void *sinkHandle; STbVerInfo tbInfo; diff --git a/source/libs/qworker/inc/qwMsg.h b/source/libs/qworker/inc/qwMsg.h index 3ff5b5950f6e241af51d77da843d7f00e9fdd0b6..b46c5d6baf90cdbba9883b7a0cfe24f175c7c4b5 100644 --- a/source/libs/qworker/inc/qwMsg.h +++ b/source/libs/qworker/inc/qwMsg.h @@ -42,7 +42,7 @@ int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t c int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SArray* pExecList); int32_t qwBuildAndSendErrorRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code); void qwFreeFetchRsp(void *msg); -int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp); +int32_t qwMallocFetchRsp(int8_t rpcMalloc, int32_t length, SRetrieveTableRsp **rsp); int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *rsp, int32_t code); int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn); int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *pConn); diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index c38c9ac1cf522d747188e5e3fff5c7a81944a490..e4271dfcdaf2579ce884646a52bee523e419c3c5 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -9,10 +9,10 @@ #include "tmsg.h" #include "tname.h" -int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) { +int32_t qwMallocFetchRsp(int8_t rpcMalloc, int32_t length, SRetrieveTableRsp **rsp) { int32_t msgSize = sizeof(SRetrieveTableRsp) + length; - SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcReallocCont(*rsp, msgSize); + SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)(rpcMalloc ? rpcReallocCont(*rsp, msgSize) : taosMemoryRealloc(*rsp, msgSize)); if (NULL == pRsp) { qError("rpcMallocCont %d failed", msgSize); QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index af9399bbdc91e2c8a404dc22218a2b042903ee71..3c102938d02a74d7beda2e95e21b34b4f844fae5 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -63,11 +63,25 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { SArray *execInfoList = taosArrayInit(4, sizeof(SExplainExecInfo)); QW_ERR_RET(qGetExplainExecInfo(taskHandle, execInfoList)); - SRpcHandleInfo connInfo = ctx->ctrlConnInfo; - connInfo.ahandle = NULL; - int32_t code = qwBuildAndSendExplainRsp(&connInfo, execInfoList); - taosArrayDestroyEx(execInfoList, freeItem); - QW_ERR_RET(code); + if (ctx->localExec) { + SExplainLocalRsp localRsp = {0}; + localRsp.rsp.numOfPlans = taosArrayGetSize(execInfoList); + SExplainExecInfo *pExec = taosMemoryCalloc(localRsp.rsp.numOfPlans, sizeof(SExplainExecInfo)); + memcpy(pExec, taosArrayGet(execInfoList, 0), localRsp.rsp.numOfPlans * sizeof(SExplainExecInfo)); + localRsp.rsp.subplanInfo = pExec; + localRsp.qId = qId; + localRsp.tId = tId; + localRsp.rId = rId; + localRsp.eId = eId; + taosArrayPush(ctx->explainRes, &localRsp); + taosArrayDestroy(execInfoList); + } else { + SRpcHandleInfo connInfo = ctx->ctrlConnInfo; + connInfo.ahandle = NULL; + int32_t code = qwBuildAndSendExplainRsp(&connInfo, execInfoList); + taosArrayDestroyEx(execInfoList, freeItem); + QW_ERR_RET(code); + } } if (!ctx->needFetch) { @@ -86,6 +100,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { int32_t execNum = 0; qTaskInfo_t taskHandle = ctx->taskHandle; DataSinkHandle sinkHandle = ctx->sinkHandle; + SLocalFetch localFetch = {(void*)mgmt, ctx->localExec, qWorkerProcessLocalFetch, ctx->explainRes}; SArray *pResList = taosArrayInit(4, POINTER_BYTES); while (true) { @@ -94,8 +109,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { // if *taskHandle is NULL, it's killed right now if (taskHandle) { qwDbgSimulateSleep(); - - code = qExecTaskOpt(taskHandle, pResList, &useconds); + code = qExecTaskOpt(taskHandle, pResList, &useconds, &localFetch); if (code) { if (code != TSDB_CODE_OPS_NOT_SUPPORT) { QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code)); @@ -235,7 +249,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC); if (NULL == rsp) { - QW_ERR_RET(qwMallocFetchRsp(len, &rsp)); + QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, len, &rsp)); *pOutput = output; } else { pOutput->queryEnd = output.queryEnd; @@ -256,7 +270,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, *dataLen += len; - QW_ERR_RET(qwMallocFetchRsp(*dataLen, &rsp)); + QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, *dataLen, &rsp)); output.pData = rsp->data + *dataLen - len; code = dsGetDataBlock(ctx->sinkHandle, &output); @@ -480,16 +494,18 @@ _return: } if (QW_PHASE_POST_QUERY == phase && ctx) { - ctx->queryRsped = true; - - bool rsped = false; - SQWMsg qwMsg = {.msgType = ctx->msgType, .connInfo = ctx->ctrlConnInfo}; - qwDbgSimulateRedirect(&qwMsg, ctx, &rsped); - qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped); - if (!rsped) { - qwBuildAndSendQueryRsp(input->msgType + 1, &ctx->ctrlConnInfo, code, ctx); - QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); + if (!ctx->localExec) { + bool rsped = false; + SQWMsg qwMsg = {.msgType = ctx->msgType, .connInfo = ctx->ctrlConnInfo}; + qwDbgSimulateRedirect(&qwMsg, ctx, &rsped); + qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped); + if (!rsped) { + qwBuildAndSendQueryRsp(input->msgType + 1, &ctx->ctrlConnInfo, code, ctx); + QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); + } } + + ctx->queryRsped = true; } if (ctx) { @@ -518,11 +534,6 @@ int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF) { int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { int32_t code = 0; - bool queryRsped = false; - SSubplan *plan = NULL; - SQWPhaseInput input = {0}; - qTaskInfo_t pTaskInfo = NULL; - DataSinkHandle sinkHandle = NULL; SQWTaskCtx *ctx = NULL; QW_ERR_JRET(qwRegisterQueryBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo)); @@ -562,6 +573,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { ctx->explain = qwMsg->msgInfo.explain; ctx->needFetch = qwMsg->msgInfo.needFetch; ctx->msgType = qwMsg->msgType; + ctx->localExec = false; // QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg); @@ -584,19 +596,12 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } - // QW_ERR_JRET(qwBuildAndSendQueryRsp(&qwMsg->connInfo, code)); - // QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); - - // queryRsped = true; - ctx->level = plan->level; atomic_store_ptr(&ctx->taskHandle, pTaskInfo); atomic_store_ptr(&ctx->sinkHandle, sinkHandle); - if (pTaskInfo && sinkHandle) { - qwSaveTbVersionInfo(pTaskInfo, ctx); - QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL)); - } + qwSaveTbVersionInfo(pTaskInfo, ctx); + QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL)); _return: @@ -606,11 +611,6 @@ _return: input.msgType = qwMsg->msgType; code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL); - // if (!queryRsped) { - // qwBuildAndSendQueryRsp(&qwMsg->connInfo, code); - // QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); - //} - QW_RET(TSDB_CODE_SUCCESS); } @@ -1006,8 +1006,8 @@ _return: QW_RET(TSDB_CODE_SUCCESS); } -int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb) { - if (NULL == qWorkerMgmt || pMsgCb->mgmt == NULL) { +int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const SMsgCb *pMsgCb) { + if (NULL == qWorkerMgmt || (pMsgCb && pMsgCb->mgmt == NULL)) { qError("invalid param to init qworker"); QW_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -1030,22 +1030,9 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - if (cfg) { - mgmt->cfg = *cfg; - if (0 == mgmt->cfg.maxSchedulerNum) { - mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER; - } - if (0 == mgmt->cfg.maxTaskNum) { - mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER; - } - if (0 == mgmt->cfg.maxSchTaskNum) { - mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER; - } - } else { - mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER; - mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER; - mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER; - } + mgmt->cfg.maxSchedulerNum = QW_DEFAULT_SCHEDULER_NUMBER; + mgmt->cfg.maxTaskNum = QW_DEFAULT_TASK_NUMBER; + mgmt->cfg.maxSchTaskNum = QW_DEFAULT_SCH_TASK_NUMBER; mgmt->schHash = taosHashInit(mgmt->cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); @@ -1070,7 +1057,11 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW mgmt->nodeType = nodeType; mgmt->nodeId = nodeId; - mgmt->msgCb = *pMsgCb; + if (pMsgCb) { + mgmt->msgCb = *pMsgCb; + } else { + memset(&mgmt->msgCb, 0, sizeof(mgmt->msgCb)); + } mgmt->refId = taosAddRef(gQwMgmt.qwRef, mgmt); if (mgmt->refId < 0) { @@ -1153,3 +1144,112 @@ int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pSt return TSDB_CODE_SUCCESS; } + +int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, SQWMsg *qwMsg, SArray *explainRes) { + SQWorker *mgmt = (SQWorker*)pMgmt; + int32_t code = 0; + SQWTaskCtx *ctx = NULL; + SSubplan *plan = (SSubplan *)qwMsg->msg; + SQWPhaseInput input = {0}; + qTaskInfo_t pTaskInfo = NULL; + DataSinkHandle sinkHandle = NULL; + SReadHandle rHandle = {0}; + + QW_ERR_JRET(qwAddTaskCtx(QW_FPARAMS())); + QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT)); + + QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, NULL)); + QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx)); + + ctx->taskType = qwMsg->msgInfo.taskType; + ctx->explain = qwMsg->msgInfo.explain; + ctx->needFetch = qwMsg->msgInfo.needFetch; + ctx->msgType = qwMsg->msgType; + ctx->localExec = true; + ctx->explainRes = explainRes; + + rHandle.pMsgCb = taosMemoryCalloc(1, sizeof(SMsgCb)); + rHandle.pMsgCb->clientRpc = qwMsg->connInfo.handle; + + code = qCreateExecTask(&rHandle, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH); + if (code) { + QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code)); + QW_ERR_JRET(code); + } + + if (NULL == sinkHandle || NULL == pTaskInfo) { + QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle); + QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + } + + ctx->level = plan->level; + atomic_store_ptr(&ctx->taskHandle, pTaskInfo); + atomic_store_ptr(&ctx->sinkHandle, sinkHandle); + + QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL)); + +_return: + + taosMemoryFree(rHandle.pMsgCb); + + input.code = code; + input.msgType = qwMsg->msgType; + code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL); + + if (ctx) { + QW_UPDATE_RSP_CODE(ctx, code); + qwReleaseTaskCtx(mgmt, ctx); + } + + QW_RET(code); +} + +int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, void** pRsp, SArray* explainRes) { + SQWorker *mgmt = (SQWorker*)pMgmt; + int32_t code = 0; + int32_t dataLen = 0; + SQWTaskCtx *ctx = NULL; + void *rsp = NULL; + bool queryStop = false; + + SQWPhaseInput input = {0}; + + QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_FETCH, &input, NULL)); + + QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); + + ctx->msgType = TDMT_SCH_MERGE_FETCH; + ctx->explainRes = explainRes; + + SOutputData sOutput = {0}; + + while (true) { + QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); + + if (NULL == rsp) { + QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryStop)); + + continue; + } else { + bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd); + + qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete); + if (qComplete) { + atomic_store_8((int8_t *)&ctx->queryEnd, true); + } + + break; + } + } + +_return: + + *pRsp = rsp; + + input.code = code; + code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_FETCH, &input, NULL); + + QW_RET(code); +} + + diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp index 1f76ea1e7e294090019d8f67e108b1b97cb84d6b..60d6594c1b62389147fd7f8dd7c5de0ee4b4211b 100644 --- a/source/libs/qworker/test/qworkerTests.cpp +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -877,7 +877,7 @@ TEST(seqTest, normalCase) { SMsgCb msgCb = {0}; msgCb.mgmt = (void *)mockPointer; msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue; - code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); + code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb); ASSERT_EQ(code, 0); code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0); @@ -913,7 +913,7 @@ TEST(seqTest, cancelFirst) { SMsgCb msgCb = {0}; msgCb.mgmt = (void *)mockPointer; msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue; - code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); + code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb); ASSERT_EQ(code, 0); code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0); @@ -950,7 +950,7 @@ TEST(seqTest, randCase) { SMsgCb msgCb = {0}; msgCb.mgmt = (void *)mockPointer; msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue; - code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); + code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb); ASSERT_EQ(code, 0); int32_t t = 0; @@ -1021,7 +1021,7 @@ TEST(seqTest, multithreadRand) { SMsgCb msgCb = {0}; msgCb.mgmt = (void *)mockPointer; msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue; - code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); + code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb); ASSERT_EQ(code, 0); TdThreadAttr thattr; @@ -1084,7 +1084,7 @@ TEST(rcTest, shortExecshortDelay) { SMsgCb msgCb = {0}; msgCb.mgmt = (void *)mockPointer; msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue; - code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); + code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb); ASSERT_EQ(code, 0); qwtTestMaxExecTaskUsec = 0; @@ -1168,7 +1168,7 @@ TEST(rcTest, longExecshortDelay) { SMsgCb msgCb = {0}; msgCb.mgmt = (void *)mockPointer; msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue; - code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); + code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb); ASSERT_EQ(code, 0); qwtTestMaxExecTaskUsec = 1000000; @@ -1254,7 +1254,7 @@ TEST(rcTest, shortExeclongDelay) { SMsgCb msgCb = {0}; msgCb.mgmt = (void *)mockPointer; msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue; - code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); + code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb); ASSERT_EQ(code, 0); qwtTestMaxExecTaskUsec = 0; @@ -1338,7 +1338,7 @@ TEST(rcTest, dropTest) { SMsgCb msgCb = {0}; msgCb.mgmt = (void *)mockPointer; msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue; - code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); + code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb); ASSERT_EQ(code, 0); tsem_init(&qwtTestQuerySem, 0, 0); diff --git a/source/libs/scheduler/CMakeLists.txt b/source/libs/scheduler/CMakeLists.txt index 88180391ddaae2b5dfd2b2f33a3bc4a34cac8e09..3288120b67518aa532db7579a7677086899514c7 100644 --- a/source/libs/scheduler/CMakeLists.txt +++ b/source/libs/scheduler/CMakeLists.txt @@ -9,7 +9,7 @@ target_include_directories( target_link_libraries( scheduler - PUBLIC os util nodes planner qcom common catalog transport command + PUBLIC os util nodes planner qcom common catalog transport command qworker executor ) if(${BUILD_TEST}) diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index a62531a8757f272f2ada32be4966e349ec46e769..7ced4f626c413d7f208d524e833c3b2626678751 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -151,6 +151,7 @@ typedef struct SSchedulerMgmt { SSchStat stat; SRWLatch hbLock; SHashObj *hbConnections; + void *queryMgmt; } SSchedulerMgmt; typedef struct SSchCallbackParamHeader { @@ -235,8 +236,10 @@ typedef struct SSchTask { typedef struct SSchJobAttr { EExplainMode explainMode; bool queryJob; + bool insertJob; bool needFetch; bool needFlowCtrl; + bool localExec; } SSchJobAttr; typedef struct { @@ -263,7 +266,7 @@ typedef struct SSchJob { SHashObj *taskList; SHashObj *execTasks; // executing and executed tasks, key:taskid, value:SQueryTask* SHashObj *flowCtrl; // key is ep, element is SSchFlowControl - + SExplainCtx *explainCtx; int8_t status; SQueryNodeAddr resNode; @@ -304,6 +307,8 @@ extern SSchedulerMgmt schMgmt; #define SCH_IS_DATA_BIND_QRY_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) #define SCH_IS_DATA_BIND_TASK(task) (((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) || ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY)) #define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum) +#define SCH_IS_DATA_MERGE_TASK(task) (!SCH_IS_DATA_BIND_TASK(task)) +#define SCH_IS_LOCAL_EXEC_TASK(_job, _task) ((_job)->attr.localExec && SCH_IS_QUERY_JOB(_job) && (!SCH_IS_INSERT_JOB(_job)) && (!SCH_IS_DATA_BIND_QRY_TASK(_task))) #define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st) #define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status) @@ -326,8 +331,9 @@ extern SSchedulerMgmt schMgmt; #define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_BIND_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH) #define SCH_TASK_NEED_FETCH(_task) ((_task)->plan->subplanType != SUBPLAN_TYPE_MODIFY) -#define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } } while (0) +#define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } else { (_job)->attr.insertJob = true; } } while (0) #define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob) +#define SCH_IS_INSERT_JOB(_job) ((_job)->attr.insertJob) #define SCH_JOB_NEED_FETCH(_job) ((_job)->attr.needFetch) #define SCH_JOB_NEED_WAIT(_job) (!SCH_IS_QUERY_JOB(_job)) #define SCH_JOB_NEED_DROP(_job) (SCH_IS_QUERY_JOB(_job)) @@ -502,6 +508,8 @@ void schDirectPostJobRes(SSchedulerReq* pReq, int32_t errCode); int32_t schHandleJobFailure(SSchJob *pJob, int32_t errCode); int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode); bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync); +int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rspCode); +int32_t schProcessExplainRsp(SSchJob *pJob, SSchTask *pTask, SExplainRsp *rsp); extern SSchDebug gSCHDebug; diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 988049059461e6c88087881028076fa0be02aa74..5e47c0a0edb23a923220fd45c5b014603ed201b2 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -720,6 +720,7 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) { } pJob->attr.explainMode = pReq->pDag->explainInfo.mode; + pJob->attr.localExec = pReq->localReq; pJob->conn = *pReq->pConn; if (pReq->sql) { pJob->sql = strdup(pReq->sql); diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index c6dc57d752aba9e0bc40fb208584443f2edf57f2..1b59c06140daad7370c7721c172b3da2a8163732 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -72,6 +72,71 @@ int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { return TSDB_CODE_SUCCESS; } +int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rspCode) { + SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg; + int32_t code = 0; + + SCH_ERR_JRET(rspCode); + + if (NULL == msg) { + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + if (SCH_IS_EXPLAIN_JOB(pJob)) { + if (rsp->completed) { + SRetrieveTableRsp *pRsp = NULL; + SCH_ERR_JRET(qExecExplainEnd(pJob->explainCtx, &pRsp)); + if (pRsp) { + SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp)); + } + + taosMemoryFreeClear(msg); + + return TSDB_CODE_SUCCESS; + } + + SCH_ERR_JRET(schLaunchFetchTask(pJob)); + + taosMemoryFreeClear(msg); + + return TSDB_CODE_SUCCESS; + } + + if (pJob->fetchRes) { + SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->fetchRes); + SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); + } + + atomic_store_ptr(&pJob->fetchRes, rsp); + atomic_add_fetch_32(&pJob->resNumOfRows, htonl(rsp->numOfRows)); + + if (rsp->completed) { + SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC); + } + + SCH_TASK_DLOG("got fetch rsp, rows:%d, complete:%d", htonl(rsp->numOfRows), rsp->completed); + + msg = NULL; + schProcessOnDataFetched(pJob); + +_return: + + taosMemoryFreeClear(msg); + + SCH_RET(code); +} + +int32_t schProcessExplainRsp(SSchJob *pJob, SSchTask *pTask, SExplainRsp *rsp) { + SRetrieveTableRsp *pRsp = NULL; + SCH_ERR_RET(qExplainUpdateExecInfo(pJob->explainCtx, rsp, pTask->plan->id.groupId, &pRsp)); + + if (pRsp) { + SCH_ERR_RET(schProcessOnExplainDone(pJob, pTask, pRsp)); + } + + return TSDB_CODE_SUCCESS; +} + // Note: no more task error processing, handled in function internal int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, int32_t rspCode) { int32_t code = 0; @@ -301,65 +366,20 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa SExplainRsp rsp = {0}; if (tDeserializeSExplainRsp(msg, msgSize, &rsp)) { - taosMemoryFree(rsp.subplanInfo); + tFreeSExplainRsp(&rsp); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SRetrieveTableRsp *pRsp = NULL; - SCH_ERR_JRET(qExplainUpdateExecInfo(pJob->explainCtx, &rsp, pTask->plan->id.groupId, &pRsp)); + SCH_ERR_JRET(schProcessExplainRsp(pJob, pTask, &rsp)); - if (pRsp) { - SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp)); - } + taosMemoryFreeClear(msg); break; } case TDMT_SCH_FETCH_RSP: case TDMT_SCH_MERGE_FETCH_RSP: { - SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg; - - SCH_ERR_JRET(rspCode); - if (NULL == msg) { - SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); - } - - if (SCH_IS_EXPLAIN_JOB(pJob)) { - if (rsp->completed) { - SRetrieveTableRsp *pRsp = NULL; - SCH_ERR_JRET(qExecExplainEnd(pJob->explainCtx, &pRsp)); - if (pRsp) { - SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp)); - } - - taosMemoryFreeClear(msg); - - return TSDB_CODE_SUCCESS; - } - - SCH_ERR_JRET(schLaunchFetchTask(pJob)); - - taosMemoryFreeClear(msg); - - return TSDB_CODE_SUCCESS; - } - - if (pJob->fetchRes) { - SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->fetchRes); - taosMemoryFreeClear(rsp); - SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); - } - - atomic_store_ptr(&pJob->fetchRes, rsp); - atomic_add_fetch_32(&pJob->resNumOfRows, htonl(rsp->numOfRows)); - - if (rsp->completed) { - SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC); - } - - SCH_TASK_DLOG("got fetch rsp, rows:%d, complete:%d", htonl(rsp->numOfRows), rsp->completed); - + code = schProcessFetchRsp(pJob, pTask, msg, rspCode); msg = NULL; - - schProcessOnDataFetched(pJob); + SCH_ERR_JRET(code); break; } case TDMT_SCH_DROP_TASK_RSP: { diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 2cb227d84b1332550c23006a7392f0e7346d2600..b6f96a188e4fc06f365b6ccf527c3587abb959af 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -20,6 +20,7 @@ #include "tmsg.h" #include "tref.h" #include "trpc.h" +#include "qworker.h" #include "tglobal.h" void schFreeTask(SSchJob *pJob, SSchTask *pTask) { @@ -90,6 +91,10 @@ _return: } int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) { + if (SCH_IS_LOCAL_EXEC_TASK(pJob, pTask)) { + return TSDB_CODE_SUCCESS; + } + SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); if (NULL == addr) { SCH_TASK_ELOG("taosArrayGet candidate addr failed, idx:%d, size:%d", pTask->candidateIdx, @@ -230,7 +235,6 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) SCH_RET(errCode); } -// Note: no more task error processing, handled in function internal int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { bool moved = false; int32_t code = 0; @@ -295,6 +299,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { .execId = pTask->execId, .addr = pTask->succeedAddr, .fetchMsgType = SCH_FETCH_TYPE(pTask), + .localExec = SCH_IS_LOCAL_EXEC_TASK(pJob, pTask), }; qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source); SCH_UNLOCK(SCH_WRITE, &parent->planLock); @@ -825,6 +830,120 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) { return TSDB_CODE_SUCCESS; } +int32_t schHandleExplainRes(SArray *pExplainRes) { + int32_t code = 0; + int32_t resNum = taosArrayGetSize(pExplainRes); + if (resNum <= 0) { + goto _return; + } + + SSchTask *pTask = NULL; + SSchJob *pJob = NULL; + + for (int32_t i = 0; i < resNum; ++i) { + SExplainLocalRsp* localRsp = taosArrayGet(pExplainRes, i); + + qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", begin to handle LOCAL explain rsp msg", localRsp->qId, localRsp->tId); + + pJob = schAcquireJob(localRsp->rId); + if (NULL == pJob) { + qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, localRsp->qId, localRsp->tId, localRsp->rId); + SCH_ERR_JRET(TSDB_CODE_QRY_JOB_NOT_EXIST); + } + + int8_t status = 0; + if (schJobNeedToStop(pJob, &status)) { + SCH_TASK_DLOG("will not do further processing cause of job status %s", jobTaskStatusStr(status)); + schReleaseJob(pJob->refId); + SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR); + } + + code = schGetTaskInJob(pJob, localRsp->tId, &pTask); + + if (TSDB_CODE_SUCCESS == code) { + code = schProcessExplainRsp(pJob, pTask, &localRsp->rsp); + } + + schReleaseJob(pJob->refId); + + qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", end to handle LOCAL explain rsp msg, code:%x", localRsp->qId, localRsp->tId, code); + + SCH_ERR_JRET(code); + + localRsp->rsp.numOfPlans = 0; + localRsp->rsp.subplanInfo = NULL; + pTask = NULL; + pJob = NULL; + } + +_return: + + for (int32_t i = 0; i < resNum; ++i) { + SExplainLocalRsp* localRsp = taosArrayGet(pExplainRes, i); + tFreeSExplainRsp(&localRsp->rsp); + } + + taosArrayDestroy(pExplainRes); + + SCH_RET(code); +} + +int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) { + SSubplan *plan = pTask->plan; + int32_t code = 0; + + if (NULL == pTask->msg) { // TODO add more detailed reason for failure + code = qSubPlanToMsg(plan, &pTask->msg, &pTask->msgLen); + if (TSDB_CODE_SUCCESS != code) { + SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg, + pTask->msgLen); + SCH_ERR_RET(code); + } else if (tsQueryPlannerTrace) { + char *msg = NULL; + int32_t msgLen = 0; + qSubPlanToString(plan, &msg, &msgLen); + SCH_TASK_DLOGL("physical plan len:%d, %s", msgLen, msg); + taosMemoryFree(msg); + } + } + + SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask)); + + if (SCH_IS_QUERY_JOB(pJob)) { + SCH_ERR_RET(schEnsureHbConnection(pJob, pTask)); + } + + SCH_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType)); +} + +int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) { + //SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask)); + if (NULL == schMgmt.queryMgmt) { + SCH_ERR_RET(qWorkerInit(NODE_TYPE_CLIENT, CLIENT_HANDLE, (void **)&schMgmt.queryMgmt, NULL)); + } + + SArray *explainRes = NULL; + SQWMsg qwMsg = {0}; + qwMsg.msgInfo.taskType = TASK_TYPE_TEMP; + qwMsg.msgInfo.explain = SCH_IS_EXPLAIN_JOB(pJob); + qwMsg.msgInfo.needFetch = SCH_TASK_NEED_FETCH(pTask); + qwMsg.msg = pTask->plan; + qwMsg.msgType = pTask->plan->msgType; + qwMsg.connInfo.handle = pJob->conn.pTrans; + + if (SCH_IS_EXPLAIN_JOB(pJob)) { + explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp)); + } + + SCH_ERR_RET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId, pTask->execId, &qwMsg, explainRes)); + + if (SCH_IS_EXPLAIN_JOB(pJob)) { + SCH_ERR_RET(schHandleExplainRes(explainRes)); + } + + SCH_RET(schProcessOnTaskSuccess(pJob, pTask)); +} + int32_t schLaunchTaskImpl(void *param) { SSchTaskCtx *pCtx = (SSchTaskCtx *)param; SSchJob *pJob = schAcquireJob(pCtx->jobRid); @@ -848,7 +967,8 @@ int32_t schLaunchTaskImpl(void *param) { pTask->retryTimes++; pTask->waitRetry = false; - SCH_TASK_DLOG("start to launch task, execId %d, retry %d", pTask->execId, pTask->retryTimes); + SCH_TASK_DLOG("start to launch %s task, execId %d, retry %d", SCH_IS_LOCAL_EXEC_TASK(pJob, pTask) ? "LOCAL" : "REMOTE", + pTask->execId, pTask->retryTimes); SCH_LOG_TASK_START_TS(pTask); @@ -863,31 +983,12 @@ int32_t schLaunchTaskImpl(void *param) { SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC); } - SSubplan *plan = pTask->plan; - - if (NULL == pTask->msg) { // TODO add more detailed reason for failure - code = qSubPlanToMsg(plan, &pTask->msg, &pTask->msgLen); - if (TSDB_CODE_SUCCESS != code) { - SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg, - pTask->msgLen); - SCH_ERR_JRET(code); - } else if (tsQueryPlannerTrace) { - char *msg = NULL; - int32_t msgLen = 0; - qSubPlanToString(plan, &msg, &msgLen); - SCH_TASK_DLOGL("physical plan len:%d, %s", msgLen, msg); - taosMemoryFree(msg); - } - } - - SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask)); - - if (SCH_IS_QUERY_JOB(pJob)) { - SCH_ERR_JRET(schEnsureHbConnection(pJob, pTask)); + if (SCH_IS_LOCAL_EXEC_TASK(pJob, pTask)) { + SCH_ERR_JRET(schLaunchLocalTask(pJob, pTask)); + } else { + SCH_ERR_JRET(schLaunchRemoteTask(pJob, pTask)); } - SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType)); - _return: if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) { @@ -980,6 +1081,29 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) { } } +int32_t schExecRemoteFetch(SSchJob *pJob, SSchTask *pTask) { + SCH_RET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, SCH_FETCH_TYPE(pJob->fetchTask))); +} + +int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) { + void *pRsp = NULL; + SArray *explainRes = NULL; + + if (SCH_IS_EXPLAIN_JOB(pJob)) { + explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp)); + } + + SCH_ERR_RET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId, pTask->execId, &pRsp, explainRes)); + + if (SCH_IS_EXPLAIN_JOB(pJob)) { + SCH_ERR_RET(schHandleExplainRes(explainRes)); + } + + SCH_ERR_RET(schProcessFetchRsp(pJob, pTask, pRsp, TSDB_CODE_SUCCESS)); + + return TSDB_CODE_SUCCESS; +} + // Note: no more error processing, handled in function internal int32_t schLaunchFetchTask(SSchJob *pJob) { int32_t code = 0; @@ -990,7 +1114,11 @@ int32_t schLaunchFetchTask(SSchJob *pJob) { return TSDB_CODE_SUCCESS; } - SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, SCH_FETCH_TYPE(pJob->fetchTask))); + if (SCH_IS_LOCAL_EXEC_TASK(pJob, pJob->fetchTask)) { + SCH_ERR_JRET(schExecLocalFetch(pJob, pJob->fetchTask)); + } else { + SCH_ERR_JRET(schExecRemoteFetch(pJob, pJob->fetchTask)); + } return TSDB_CODE_SUCCESS; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index f1ec7f5e043fc7810e54377f811e90d3dcc29b0f..0ccaef385744738969973b19a38182cbac4399d9 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -17,6 +17,7 @@ #include "schInt.h" #include "tmsg.h" #include "tref.h" +#include "qworker.h" SSchedulerMgmt schMgmt = { .jobRef = -1, @@ -192,4 +193,7 @@ void schedulerDestroy(void) { schMgmt.hbConnections = NULL; } SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock); + + qWorkerDestroy(&schMgmt.queryMgmt); + schMgmt.queryMgmt = NULL; } diff --git a/source/libs/sync/CMakeLists.txt b/source/libs/sync/CMakeLists.txt index 551849c6f29f3def8b275877aba28f7048ea1793..6025070cb72adcf9e3753ca674695edb48266b3f 100644 --- a/source/libs/sync/CMakeLists.txt +++ b/source/libs/sync/CMakeLists.txt @@ -15,6 +15,6 @@ target_include_directories( PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) -if(${BUILD_TEST}) +if(BUILD_TEST AND BUILD_SYNC_TEST) add_subdirectory(test) -endif(${BUILD_TEST}) +endif() diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 684d6a16e20f8d7556e74b34d3578738d265f411..3ae3f628866900b21f2fc873b8d3dcbf90186108 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -517,3 +517,94 @@ python3 ./test.py -f 2-query/tsbsQuery.py -Q 3 python3 ./test.py -f 2-query/sml.py -Q 3 python3 ./test.py -f 2-query/interp.py -Q 3 + +#------------querPolicy 4----------- + +python3 ./test.py -f 2-query/between.py -Q 4 +python3 ./test.py -f 2-query/distinct.py -Q 4 +python3 ./test.py -f 2-query/varchar.py -Q 4 +python3 ./test.py -f 2-query/ltrim.py -Q 4 +python3 ./test.py -f 2-query/rtrim.py -Q 4 +python3 ./test.py -f 2-query/length.py -Q 4 +python3 ./test.py -f 2-query/char_length.py -Q 4 +python3 ./test.py -f 2-query/upper.py -Q 4 +python3 ./test.py -f 2-query/lower.py -Q 4 +python3 ./test.py -f 2-query/join.py -Q 4 +python3 ./test.py -f 2-query/join2.py -Q 4 +python3 ./test.py -f 2-query/cast.py -Q 4 +python3 ./test.py -f 2-query/substr.py -Q 4 +python3 ./test.py -f 2-query/union.py -Q 4 +python3 ./test.py -f 2-query/union1.py -Q 4 +python3 ./test.py -f 2-query/concat.py -Q 4 +python3 ./test.py -f 2-query/concat2.py -Q 4 +python3 ./test.py -f 2-query/concat_ws.py -Q 4 +python3 ./test.py -f 2-query/concat_ws2.py -Q 4 +#python3 ./test.py -f 2-query/check_tsdb.py -Q 4 +python3 ./test.py -f 2-query/spread.py -Q 4 +python3 ./test.py -f 2-query/hyperloglog.py -Q 4 +python3 ./test.py -f 2-query/explain.py -Q 4 +python3 ./test.py -f 2-query/leastsquares.py -Q 4 +python3 ./test.py -f 2-query/timezone.py -Q 4 +python3 ./test.py -f 2-query/Now.py -Q 4 +python3 ./test.py -f 2-query/Today.py -Q 4 +python3 ./test.py -f 2-query/max.py -Q 4 +python3 ./test.py -f 2-query/min.py -Q 4 +python3 ./test.py -f 2-query/count.py -Q 4 +#python3 ./test.py -f 2-query/last.py -Q 4 +python3 ./test.py -f 2-query/first.py -Q 4 +python3 ./test.py -f 2-query/To_iso8601.py -Q 4 +python3 ./test.py -f 2-query/To_unixtimestamp.py -Q 4 +python3 ./test.py -f 2-query/timetruncate.py -Q 4 +python3 ./test.py -f 2-query/diff.py -Q 4 +python3 ./test.py -f 2-query/Timediff.py -Q 4 +python3 ./test.py -f 2-query/json_tag.py -Q 4 +python3 ./test.py -f 2-query/top.py -Q 4 +python3 ./test.py -f 2-query/bottom.py -Q 4 +python3 ./test.py -f 2-query/percentile.py -Q 4 +python3 ./test.py -f 2-query/apercentile.py -Q 4 +python3 ./test.py -f 2-query/abs.py -Q 4 +python3 ./test.py -f 2-query/ceil.py -Q 4 +python3 ./test.py -f 2-query/floor.py -Q 4 +python3 ./test.py -f 2-query/round.py -Q 4 +python3 ./test.py -f 2-query/log.py -Q 4 +python3 ./test.py -f 2-query/pow.py -Q 4 +python3 ./test.py -f 2-query/sqrt.py -Q 4 +python3 ./test.py -f 2-query/sin.py -Q 4 +python3 ./test.py -f 2-query/cos.py -Q 4 +python3 ./test.py -f 2-query/tan.py -Q 4 +python3 ./test.py -f 2-query/arcsin.py -Q 4 +python3 ./test.py -f 2-query/arccos.py -Q 4 +python3 ./test.py -f 2-query/arctan.py -Q 4 +python3 ./test.py -f 2-query/query_cols_tags_and_or.py -Q 4 +# python3 ./test.py -f 2-query/nestedQuery.py -Q 4 +# python3 ./test.py -f 2-query/nestedQuery_str.py -Q 4 +# python3 ./test.py -f 2-query/avg.py -Q 4 +# python3 ./test.py -f 2-query/elapsed.py -Q 4 +python3 ./test.py -f 2-query/csum.py -Q 4 +#python3 ./test.py -f 2-query/mavg.py -Q 4 +python3 ./test.py -f 2-query/sample.py -Q 4 +python3 ./test.py -f 2-query/function_diff.py -Q 4 +python3 ./test.py -f 2-query/unique.py -Q 4 +python3 ./test.py -f 2-query/stateduration.py -Q 4 +python3 ./test.py -f 2-query/function_stateduration.py -Q 4 +python3 ./test.py -f 2-query/statecount.py -Q 4 +python3 ./test.py -f 2-query/tail.py -Q 4 +python3 ./test.py -f 2-query/ttl_comment.py -Q 4 +python3 ./test.py -f 2-query/distribute_agg_count.py -Q 4 +python3 ./test.py -f 2-query/distribute_agg_max.py -Q 4 +python3 ./test.py -f 2-query/distribute_agg_min.py -Q 4 +python3 ./test.py -f 2-query/distribute_agg_sum.py -Q 4 +python3 ./test.py -f 2-query/distribute_agg_spread.py -Q 4 +python3 ./test.py -f 2-query/distribute_agg_apercentile.py -Q 4 +python3 ./test.py -f 2-query/distribute_agg_avg.py -Q 4 +python3 ./test.py -f 2-query/distribute_agg_stddev.py -Q 4 +python3 ./test.py -f 2-query/twa.py -Q 4 +python3 ./test.py -f 2-query/irate.py -Q 4 +python3 ./test.py -f 2-query/function_null.py -Q 4 +python3 ./test.py -f 2-query/count_partition.py -Q 4 +python3 ./test.py -f 2-query/max_partition.py -Q 4 +python3 ./test.py -f 2-query/last_row.py -Q 4 +python3 ./test.py -f 2-query/tsbsQuery.py -Q 4 +#python3 ./test.py -f 2-query/sml.py -Q 4 +python3 ./test.py -f 2-query/interp.py -Q 4 +