From 2750ace3c311fdf88e48d8dc46f0316dcdb96a62 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 23 Sep 2022 16:37:09 +0800 Subject: [PATCH] fix: fix insert into select issue --- source/client/src/clientImpl.c | 37 ++++++++---------------------- source/libs/scheduler/inc/schInt.h | 6 +++-- 2 files changed, 13 insertions(+), 30 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 865ab8f611..be907ea1e2 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -483,7 +483,8 @@ void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) { int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pDbVgList) { SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad)); - + char *policy = (tsQueryPolicy == QUERY_POLICY_VNODE) ? "vnode" : "client"; + int32_t dbNum = taosArrayGetSize(pDbVgList); for (int32_t i = 0; i < dbNum; ++i) { SArray* pVg = taosArrayGetP(pDbVgList, i); @@ -504,20 +505,20 @@ int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArr int32_t vnodeNum = taosArrayGetSize(nodeList); if (vnodeNum > 0) { - tscDebug("0x%" PRIx64 " vnode policy, use vnode list, num:%d", pRequest->requestId, vnodeNum); + tscDebug("0x%" PRIx64 " %s policy, use vnode list, num:%d", pRequest->requestId, policy, vnodeNum); goto _return; } int32_t mnodeNum = taosArrayGetSize(pMnodeList); if (mnodeNum <= 0) { - tscDebug("0x%" PRIx64 " vnode policy, empty node list", pRequest->requestId); + tscDebug("0x%" PRIx64 " %s policy, empty node list", pRequest->requestId, policy); goto _return; } void* pData = taosArrayGet(pMnodeList, 0); taosArrayAddBatch(nodeList, pData, mnodeNum); - tscDebug("0x%" PRIx64 " vnode policy, use mnode list, num:%d", pRequest->requestId, mnodeNum); + tscDebug("0x%" PRIx64 " %s policy, use mnode list, num:%d", pRequest->requestId, policy, mnodeNum); _return: @@ -555,27 +556,14 @@ _return: return TSDB_CODE_SUCCESS; } - -int32_t buildClientPolicyNodeList(SRequestObj* pRequest, SArray** pNodeList) { - *pNodeList = taosArrayInit(1, sizeof(SQueryNodeLoad)); - SQueryNodeLoad load = {0}; - load.addr.nodeId = CLIENT_HANDLE; - - taosArrayPush(*pNodeList, &load); - - tscDebug("0x%" PRIx64 " client policy", pRequest->requestId); - - return TSDB_CODE_SUCCESS; -} - - int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SMetaData* pResultMeta) { SArray* pDbVgList = NULL; SArray* pQnodeList = NULL; int32_t code = 0; switch (tsQueryPolicy) { - case QUERY_POLICY_VNODE: { + case QUERY_POLICY_VNODE: + case QUERY_POLICY_CLIENT: { if (pResultMeta) { pDbVgList = taosArrayInit(4, POINTER_BYTES); @@ -614,10 +602,6 @@ int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList); break; } - case QUERY_POLICY_CLIENT: { - code = buildClientPolicyNodeList(pRequest, pNodeList); - break; - } default: tscError("unknown query policy: %d", tsQueryPolicy); return TSDB_CODE_TSC_APP_ERROR; @@ -640,7 +624,8 @@ int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* int32_t code = 0; switch (tsQueryPolicy) { - case QUERY_POLICY_VNODE: { + case QUERY_POLICY_VNODE: + case QUERY_POLICY_CLIENT: { int32_t dbNum = taosArrayGetSize(pRequest->dbList); if (dbNum > 0) { SCatalog* pCtg = NULL; @@ -678,10 +663,6 @@ int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList); break; } - case QUERY_POLICY_CLIENT: { - code = buildClientPolicyNodeList(pRequest, pNodeList); - break; - } default: tscError("unknown query policy: %d", tsQueryPolicy); return TSDB_CODE_TSC_APP_ERROR; diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index e017606035..7ced4f626c 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -236,6 +236,7 @@ typedef struct SSchTask { typedef struct SSchJobAttr { EExplainMode explainMode; bool queryJob; + bool insertJob; bool needFetch; bool needFlowCtrl; bool localExec; @@ -307,7 +308,7 @@ extern SSchedulerMgmt schMgmt; #define SCH_IS_DATA_BIND_TASK(task) (((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) || ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY)) #define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum) #define SCH_IS_DATA_MERGE_TASK(task) (!SCH_IS_DATA_BIND_TASK(task)) -#define SCH_IS_LOCAL_EXEC_TASK(_job, _task) ((_job)->attr.localExec && SCH_IS_QUERY_JOB(_job) && SCH_IS_DATA_MERGE_TASK(_task)) +#define SCH_IS_LOCAL_EXEC_TASK(_job, _task) ((_job)->attr.localExec && SCH_IS_QUERY_JOB(_job) && (!SCH_IS_INSERT_JOB(_job)) && (!SCH_IS_DATA_BIND_QRY_TASK(_task))) #define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st) #define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status) @@ -330,8 +331,9 @@ extern SSchedulerMgmt schMgmt; #define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_BIND_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH) #define SCH_TASK_NEED_FETCH(_task) ((_task)->plan->subplanType != SUBPLAN_TYPE_MODIFY) -#define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } } while (0) +#define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } else { (_job)->attr.insertJob = true; } } while (0) #define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob) +#define SCH_IS_INSERT_JOB(_job) ((_job)->attr.insertJob) #define SCH_JOB_NEED_FETCH(_job) ((_job)->attr.needFetch) #define SCH_JOB_NEED_WAIT(_job) (!SCH_IS_QUERY_JOB(_job)) #define SCH_JOB_NEED_DROP(_job) (SCH_IS_QUERY_JOB(_job)) -- GitLab