提交 2750ace3 编写于 作者: D dapan1121

fix: fix insert into select issue

上级 3fb3aa7b
...@@ -483,7 +483,8 @@ void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) { ...@@ -483,7 +483,8 @@ void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pDbVgList) { int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pDbVgList) {
SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad)); SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
char *policy = (tsQueryPolicy == QUERY_POLICY_VNODE) ? "vnode" : "client";
int32_t dbNum = taosArrayGetSize(pDbVgList); int32_t dbNum = taosArrayGetSize(pDbVgList);
for (int32_t i = 0; i < dbNum; ++i) { for (int32_t i = 0; i < dbNum; ++i) {
SArray* pVg = taosArrayGetP(pDbVgList, i); SArray* pVg = taosArrayGetP(pDbVgList, i);
...@@ -504,20 +505,20 @@ int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArr ...@@ -504,20 +505,20 @@ int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArr
int32_t vnodeNum = taosArrayGetSize(nodeList); int32_t vnodeNum = taosArrayGetSize(nodeList);
if (vnodeNum > 0) { if (vnodeNum > 0) {
tscDebug("0x%" PRIx64 " vnode policy, use vnode list, num:%d", pRequest->requestId, vnodeNum); tscDebug("0x%" PRIx64 " %s policy, use vnode list, num:%d", pRequest->requestId, policy, vnodeNum);
goto _return; goto _return;
} }
int32_t mnodeNum = taosArrayGetSize(pMnodeList); int32_t mnodeNum = taosArrayGetSize(pMnodeList);
if (mnodeNum <= 0) { if (mnodeNum <= 0) {
tscDebug("0x%" PRIx64 " vnode policy, empty node list", pRequest->requestId); tscDebug("0x%" PRIx64 " %s policy, empty node list", pRequest->requestId, policy);
goto _return; goto _return;
} }
void* pData = taosArrayGet(pMnodeList, 0); void* pData = taosArrayGet(pMnodeList, 0);
taosArrayAddBatch(nodeList, pData, mnodeNum); taosArrayAddBatch(nodeList, pData, mnodeNum);
tscDebug("0x%" PRIx64 " vnode policy, use mnode list, num:%d", pRequest->requestId, mnodeNum); tscDebug("0x%" PRIx64 " %s policy, use mnode list, num:%d", pRequest->requestId, policy, mnodeNum);
_return: _return:
...@@ -555,27 +556,14 @@ _return: ...@@ -555,27 +556,14 @@ _return:
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t buildClientPolicyNodeList(SRequestObj* pRequest, SArray** pNodeList) {
*pNodeList = taosArrayInit(1, sizeof(SQueryNodeLoad));
SQueryNodeLoad load = {0};
load.addr.nodeId = CLIENT_HANDLE;
taosArrayPush(*pNodeList, &load);
tscDebug("0x%" PRIx64 " client policy", pRequest->requestId);
return TSDB_CODE_SUCCESS;
}
int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SMetaData* pResultMeta) { int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SMetaData* pResultMeta) {
SArray* pDbVgList = NULL; SArray* pDbVgList = NULL;
SArray* pQnodeList = NULL; SArray* pQnodeList = NULL;
int32_t code = 0; int32_t code = 0;
switch (tsQueryPolicy) { switch (tsQueryPolicy) {
case QUERY_POLICY_VNODE: { case QUERY_POLICY_VNODE:
case QUERY_POLICY_CLIENT: {
if (pResultMeta) { if (pResultMeta) {
pDbVgList = taosArrayInit(4, POINTER_BYTES); pDbVgList = taosArrayInit(4, POINTER_BYTES);
...@@ -614,10 +602,6 @@ int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray ...@@ -614,10 +602,6 @@ int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray
code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList); code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
break; break;
} }
case QUERY_POLICY_CLIENT: {
code = buildClientPolicyNodeList(pRequest, pNodeList);
break;
}
default: default:
tscError("unknown query policy: %d", tsQueryPolicy); tscError("unknown query policy: %d", tsQueryPolicy);
return TSDB_CODE_TSC_APP_ERROR; return TSDB_CODE_TSC_APP_ERROR;
...@@ -640,7 +624,8 @@ int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* ...@@ -640,7 +624,8 @@ int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray*
int32_t code = 0; int32_t code = 0;
switch (tsQueryPolicy) { switch (tsQueryPolicy) {
case QUERY_POLICY_VNODE: { case QUERY_POLICY_VNODE:
case QUERY_POLICY_CLIENT: {
int32_t dbNum = taosArrayGetSize(pRequest->dbList); int32_t dbNum = taosArrayGetSize(pRequest->dbList);
if (dbNum > 0) { if (dbNum > 0) {
SCatalog* pCtg = NULL; SCatalog* pCtg = NULL;
...@@ -678,10 +663,6 @@ int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* ...@@ -678,10 +663,6 @@ int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray*
code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList); code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
break; break;
} }
case QUERY_POLICY_CLIENT: {
code = buildClientPolicyNodeList(pRequest, pNodeList);
break;
}
default: default:
tscError("unknown query policy: %d", tsQueryPolicy); tscError("unknown query policy: %d", tsQueryPolicy);
return TSDB_CODE_TSC_APP_ERROR; return TSDB_CODE_TSC_APP_ERROR;
......
...@@ -236,6 +236,7 @@ typedef struct SSchTask { ...@@ -236,6 +236,7 @@ typedef struct SSchTask {
typedef struct SSchJobAttr { typedef struct SSchJobAttr {
EExplainMode explainMode; EExplainMode explainMode;
bool queryJob; bool queryJob;
bool insertJob;
bool needFetch; bool needFetch;
bool needFlowCtrl; bool needFlowCtrl;
bool localExec; bool localExec;
...@@ -307,7 +308,7 @@ extern SSchedulerMgmt schMgmt; ...@@ -307,7 +308,7 @@ extern SSchedulerMgmt schMgmt;
#define SCH_IS_DATA_BIND_TASK(task) (((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) || ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY)) #define SCH_IS_DATA_BIND_TASK(task) (((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) || ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY))
#define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum) #define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum)
#define SCH_IS_DATA_MERGE_TASK(task) (!SCH_IS_DATA_BIND_TASK(task)) #define SCH_IS_DATA_MERGE_TASK(task) (!SCH_IS_DATA_BIND_TASK(task))
#define SCH_IS_LOCAL_EXEC_TASK(_job, _task) ((_job)->attr.localExec && SCH_IS_QUERY_JOB(_job) && SCH_IS_DATA_MERGE_TASK(_task)) #define SCH_IS_LOCAL_EXEC_TASK(_job, _task) ((_job)->attr.localExec && SCH_IS_QUERY_JOB(_job) && (!SCH_IS_INSERT_JOB(_job)) && (!SCH_IS_DATA_BIND_QRY_TASK(_task)))
#define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st) #define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st)
#define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status) #define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status)
...@@ -330,8 +331,9 @@ extern SSchedulerMgmt schMgmt; ...@@ -330,8 +331,9 @@ extern SSchedulerMgmt schMgmt;
#define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_BIND_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH) #define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_BIND_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH)
#define SCH_TASK_NEED_FETCH(_task) ((_task)->plan->subplanType != SUBPLAN_TYPE_MODIFY) #define SCH_TASK_NEED_FETCH(_task) ((_task)->plan->subplanType != SUBPLAN_TYPE_MODIFY)
#define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } } while (0) #define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } else { (_job)->attr.insertJob = true; } } while (0)
#define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob) #define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob)
#define SCH_IS_INSERT_JOB(_job) ((_job)->attr.insertJob)
#define SCH_JOB_NEED_FETCH(_job) ((_job)->attr.needFetch) #define SCH_JOB_NEED_FETCH(_job) ((_job)->attr.needFetch)
#define SCH_JOB_NEED_WAIT(_job) (!SCH_IS_QUERY_JOB(_job)) #define SCH_JOB_NEED_WAIT(_job) (!SCH_IS_QUERY_JOB(_job))
#define SCH_JOB_NEED_DROP(_job) (SCH_IS_QUERY_JOB(_job)) #define SCH_JOB_NEED_DROP(_job) (SCH_IS_QUERY_JOB(_job))
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册