From a67eb1a9a04271a9361ba9243aeedb0083a1db79 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Wed, 6 Jul 2022 11:55:47 +0800 Subject: [PATCH] eat: sql command 'insert ... select' --- source/client/src/clientStmt.c | 2 +- source/libs/nodes/src/nodesCloneFuncs.c | 3 +- source/libs/nodes/src/nodesCodeFuncs.c | 58 +++++++++++++++++++ source/libs/nodes/src/nodesUtilFuncs.c | 7 +++ source/libs/parser/src/parTranslater.c | 3 + source/libs/planner/src/planLogicCreater.c | 1 + source/libs/planner/src/planOptimizer.c | 38 +++++++----- source/libs/planner/src/planPhysiCreater.c | 8 ++- source/libs/planner/src/planSpliter.c | 56 +++++++++++++++++- source/libs/planner/test/planOtherTest.cpp | 2 +- .../1-insert/test_stmt_muti_insert_query.py | 7 ++- 11 files changed, 158 insertions(+), 27 deletions(-) diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index a1c8eb0710..1e0f30695d 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -848,7 +848,7 @@ int stmtIsInsert(TAOS_STMT* stmt, int* insert) { if (pStmt->sql.type) { *insert = (STMT_TYPE_INSERT == pStmt->sql.type || STMT_TYPE_MULTI_INSERT == pStmt->sql.type); } else { - *insert = qIsInsertValuesSql(pStmt->sql.sqlStr, 0); + *insert = qIsInsertValuesSql(pStmt->sql.sqlStr, pStmt->sql.sqlLen); } return TSDB_CODE_SUCCESS; diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 25a41fb15c..1a1aca8bdb 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -165,7 +165,7 @@ static int32_t valueNodeCopy(const SValueNode* pSrc, SValueNode* pDst) { memcpy(pDst->datum.p, pSrc->datum.p, len); break; } - case TSDB_DATA_TYPE_JSON:{ + case TSDB_DATA_TYPE_JSON: { int32_t len = getJsonValueLen(pSrc->datum.p); pDst->datum.p = taosMemoryCalloc(1, len); if (NULL == pDst->datum.p) { @@ -397,6 +397,7 @@ static int32_t logicVnodeModifCopy(const SVnodeModifyLogicNode* pSrc, SVnodeModi COPY_SCALAR_FIELD(tableType); COPY_CHAR_ARRAY_FIELD(tableFName); COPY_OBJECT_FIELD(deleteTimeRange, sizeof(STimeWindow)); + CLONE_OBJECT_FIELD(pVgroupList, vgroupsInfoClone); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index cd0743bda1..44bfa39dbd 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -273,6 +273,8 @@ const char* nodesNodeName(ENodeType type) { return "PhysiDispatch"; case QUERY_NODE_PHYSICAL_PLAN_INSERT: return "PhysiInsert"; + case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: + return "PhysiQueryInsert"; case QUERY_NODE_PHYSICAL_PLAN_DELETE: return "PhysiDelete"; case QUERY_NODE_PHYSICAL_SUBPLAN: @@ -2212,6 +2214,58 @@ static int32_t physiDispatchNodeToJson(const void* pObj, SJson* pJson) { return static int32_t jsonToPhysiDispatchNode(const SJson* pJson, void* pObj) { return jsonToPhysicDataSinkNode(pJson, pObj); } +static const char* jkQueryInsertPhysiPlanTableId = "TableId"; +static const char* jkQueryInsertPhysiPlanTableType = "TableType"; +static const char* jkQueryInsertPhysiPlanTableFName = "TableFName"; +static const char* jkQueryInsertPhysiPlanVgId = "VgId"; +static const char* jkQueryInsertPhysiPlanEpSet = "EpSet"; + +static int32_t physiQueryInsertNodeToJson(const void* pObj, SJson* pJson) { + const SQueryInserterNode* pNode = (const SQueryInserterNode*)pObj; + + int32_t code = physicDataSinkNodeToJson(pObj, pJson); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkQueryInsertPhysiPlanTableId, pNode->tableId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkQueryInsertPhysiPlanTableType, pNode->tableType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddStringToObject(pJson, jkQueryInsertPhysiPlanTableFName, pNode->tableFName); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkQueryInsertPhysiPlanVgId, pNode->vgId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkQueryInsertPhysiPlanEpSet, epSetToJson, &pNode->epSet); + } + + return code; +} + +static int32_t jsonToPhysiQueryInsertNode(const SJson* pJson, void* pObj) { + SQueryInserterNode* pNode = (SQueryInserterNode*)pObj; + + int32_t code = jsonToPhysicDataSinkNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetUBigIntValue(pJson, jkQueryInsertPhysiPlanTableId, &pNode->tableId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetTinyIntValue(pJson, jkQueryInsertPhysiPlanTableType, &pNode->tableType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetStringValue(pJson, jkQueryInsertPhysiPlanTableFName, pNode->tableFName); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetIntValue(pJson, jkQueryInsertPhysiPlanVgId, &pNode->vgId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonToObject(pJson, jkQueryInsertPhysiPlanEpSet, jsonToEpSet, &pNode->epSet); + } + + return code; +} + static const char* jkDeletePhysiPlanTableId = "TableId"; static const char* jkDeletePhysiPlanTableType = "TableType"; static const char* jkDeletePhysiPlanTableFName = "TableFName"; @@ -4234,6 +4288,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { return physiDispatchNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_INSERT: break; + case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: + return physiQueryInsertNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_DELETE: return physiDeleteNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_SUBPLAN: @@ -4376,6 +4432,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { return jsonToPhysiInterpFuncNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: return jsonToPhysiDispatchNode(pJson, pObj); + case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: + return jsonToPhysiQueryInsertNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_DELETE: return jsonToPhysiDeleteNode(pJson, pObj); case QUERY_NODE_PHYSICAL_SUBPLAN: diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 0c9cb764a5..cdc26547f7 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -327,6 +327,8 @@ SNode* nodesMakeNode(ENodeType type) { return makeNode(type, sizeof(SDataDispatcherNode)); case QUERY_NODE_PHYSICAL_PLAN_INSERT: return makeNode(type, sizeof(SDataInserterNode)); + case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: + return makeNode(type, sizeof(SQueryInserterNode)); case QUERY_NODE_PHYSICAL_PLAN_DELETE: return makeNode(type, sizeof(SDataDeleterNode)); case QUERY_NODE_PHYSICAL_SUBPLAN: @@ -934,6 +936,11 @@ void nodesDestroyNode(SNode* pNode) { taosMemoryFreeClear(pSink->pData); break; } + case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: { + SQueryInserterNode* pSink = (SQueryInserterNode*)pNode; + destroyDataSinkNode((SDataSinkNode*)pSink); + break; + } case QUERY_NODE_PHYSICAL_PLAN_DELETE: { SDataDeleterNode* pSink = (SDataDeleterNode*)pNode; destroyDataSinkNode((SDataSinkNode*)pSink); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index c2beb8a743..1f4294279b 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -2845,6 +2845,9 @@ static int32_t translateInsert(STranslateContext* pCxt, SInsertStmt* pInsert) { if (TSDB_CODE_SUCCESS == code) { code = translateExprList(pCxt, pInsert->pCols); } + if (TSDB_CODE_SUCCESS == code) { + code = resetTranslateNamespace(pCxt); + } if (TSDB_CODE_SUCCESS == code) { code = translateQuery(pCxt, pInsert->pQuery); } diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 956e4ff79f..74d780b8c7 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -1282,6 +1282,7 @@ static int32_t createVnodeModifLogicNodeByInsert(SLogicPlanContext* pCxt, SInser pModify->tableType = pRealTable->pMeta->tableType; snprintf(pModify->tableFName, sizeof(pModify->tableFName), "%d.%s.%s", pCxt->pPlanCxt->acctId, pRealTable->table.dbName, pRealTable->table.tableName); + TSWAP(pModify->pVgroupList, pRealTable->pVgroupList); *pLogicNode = (SLogicNode*)pModify; return TSDB_CODE_SUCCESS; diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 202e590955..ea8bae8259 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -485,7 +485,7 @@ static int32_t pushDownCondOptPushCondToProject(SOptimizeContext* pCxt, SProject return pushDownCondOptAppendCond(&pProject->node.pConditions, pCond); } -static int32_t pushDownCondOptPushCondToJoin(SOptimizeContext* pCxt, SJoinLogicNode * pJoin, SNode** pCond) { +static int32_t pushDownCondOptPushCondToJoin(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SNode** pCond) { return pushDownCondOptAppendCond(&pJoin->node.pConditions, pCond); } @@ -557,9 +557,9 @@ static int32_t pushDownCondOptCheckJoinOnCond(SOptimizeContext* pCxt, SJoinLogic static int32_t pushDownCondOptPartJoinOnCondLogicCond(SJoinLogicNode* pJoin, SNode** ppMergeCond, SNode** ppOnCond) { SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pJoin->pOnConditions); - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; SNodeList* pOnConds = NULL; - SNode* pCond = NULL; + SNode* pCond = NULL; FOREACH(pCond, pLogicCond->pParameterList) { if (pushDownCondOptIsPriKeyEqualCond(pJoin, pCond)) { *ppMergeCond = nodesCloneNode(pCond); @@ -604,8 +604,8 @@ static int32_t pushDownCondOptPartJoinOnCond(SJoinLogicNode* pJoin, SNode** ppMe static int32_t pushDownCondOptJoinExtractMergeCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { int32_t code = pushDownCondOptCheckJoinOnCond(pCxt, pJoin); - SNode* pJoinMergeCond = NULL; - SNode* pJoinOnCond = NULL; + SNode* pJoinMergeCond = NULL; + SNode* pJoinOnCond = NULL; if (TSDB_CODE_SUCCESS == code) { code = pushDownCondOptPartJoinOnCond(pJoin, &pJoinMergeCond, &pJoinOnCond); } @@ -820,12 +820,12 @@ static int32_t pushDownCondOptDealAgg(SOptimizeContext* pCxt, SAggLogicNode* pAg typedef struct SRewriteProjCondContext { SProjectLogicNode* pProj; - int32_t errCode; -}SRewriteProjCondContext; + int32_t errCode; +} SRewriteProjCondContext; static EDealRes rewriteProjectCondForPushDownImpl(SNode** ppNode, void* pContext) { SRewriteProjCondContext* pCxt = pContext; - SProjectLogicNode* pProj = pCxt->pProj; + SProjectLogicNode* pProj = pCxt->pProj; if (QUERY_NODE_COLUMN == nodeType(*ppNode)) { SNode* pTarget = NULL; FOREACH(pTarget, pProj->node.pTargets) { @@ -840,18 +840,19 @@ static EDealRes rewriteProjectCondForPushDownImpl(SNode** ppNode, void* pContext } nodesDestroyNode(*ppNode); *ppNode = pExpr; - } // end if expr alias name equal column name - } // end for each project - } // end if target node equals cond column node - } // end for each targets + } // end if expr alias name equal column name + } // end for each project + } // end if target node equals cond column node + } // end for each targets return DEAL_RES_IGNORE_CHILD; } return DEAL_RES_CONTINUE; } -static int32_t rewriteProjectCondForPushDown(SOptimizeContext* pCxt, SProjectLogicNode* pProject, SNode** ppProjectCond) { +static int32_t rewriteProjectCondForPushDown(SOptimizeContext* pCxt, SProjectLogicNode* pProject, + SNode** ppProjectCond) { SRewriteProjCondContext cxt = {.pProj = pProject, .errCode = TSDB_CODE_SUCCESS}; - SNode* pProjectCond = pProject->node.pConditions; + SNode* pProjectCond = pProject->node.pConditions; nodesRewriteExpr(&pProjectCond, rewriteProjectCondForPushDownImpl, &cxt); *ppProjectCond = pProjectCond; pProject->node.pConditions = NULL; @@ -873,7 +874,7 @@ static int32_t pushDownCondOptDealProject(SOptimizeContext* pCxt, SProjectLogicN } int32_t code = TSDB_CODE_SUCCESS; - SNode* pProjCond = NULL; + SNode* pProjCond = NULL; code = rewriteProjectCondForPushDown(pCxt, pProject, &pProjCond); if (TSDB_CODE_SUCCESS == code) { SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pProject->node.pChildren, 0); @@ -2082,13 +2083,18 @@ static const int32_t optimizeRuleNum = (sizeof(optimizeRuleSet) / sizeof(SOptimi static void dumpLogicSubplan(const char* pRuleName, SLogicSubplan* pSubplan) { char* pStr = NULL; nodesNodeToString((SNode*)pSubplan, false, &pStr, NULL); - qDebugL("apply optimize %s rule: %s", pRuleName, pStr); + if (NULL == pRuleName) { + qDebugL("before optimize: %s", pStr); + } else { + qDebugL("apply optimize %s rule: %s", pRuleName, pStr); + } taosMemoryFree(pStr); } static int32_t applyOptimizeRule(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) { SOptimizeContext cxt = {.pPlanCxt = pCxt, .optimized = false}; bool optimized = false; + dumpLogicSubplan(NULL, pLogicSubplan); do { optimized = false; for (int32_t i = 0; i < optimizeRuleNum; ++i) { diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 7d471357f0..7ec3af31b2 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -1502,7 +1502,7 @@ static int32_t buildInsertValuesSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLog return createDataInserter(pCxt, pModify->pVgDataBlocks, &pSubplan->pDataSink); } -static int32_t createQueryInserter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, const SPhysiNode* pRoot, +static int32_t createQueryInserter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan, SDataSinkNode** pSink) { SQueryInserterNode* pInserter = (SQueryInserterNode*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT); if (NULL == pInserter) { @@ -1514,10 +1514,12 @@ static int32_t createQueryInserter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNod strcpy(pInserter->tableFName, pModify->tableFName); pInserter->vgId = pModify->pVgroupList->vgroups[0].vgId; pInserter->epSet = pModify->pVgroupList->vgroups[0].epSet; + vgroupInfoToNodeAddr(pModify->pVgroupList->vgroups, &pSubplan->execNode); int32_t code = TSDB_CODE_SUCCESS; - pInserter->sink.pInputDataBlockDesc = (SDataBlockDescNode*)nodesCloneNode((SNode*)pRoot->pOutputDataBlockDesc); + pInserter->sink.pInputDataBlockDesc = + (SDataBlockDescNode*)nodesCloneNode((SNode*)pSubplan->pNode->pOutputDataBlockDesc); if (NULL == pInserter->sink.pInputDataBlockDesc) { code = TSDB_CODE_OUT_OF_MEMORY; } @@ -1535,7 +1537,7 @@ static int32_t buildInsertSelectSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLog int32_t code = createPhysiNode(pCxt, (SLogicNode*)nodesListGetNode(pModify->node.pChildren, 0), pSubplan, &pSubplan->pNode); if (TSDB_CODE_SUCCESS == code) { - code = createQueryInserter(pCxt, pModify, pSubplan->pNode, &pSubplan->pDataSink); + code = createQueryInserter(pCxt, pModify, pSubplan, &pSubplan->pDataSink); } pSubplan->msgType = TDMT_VND_SUBMIT; return code; diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 7644fc3b19..0863b5f21f 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -20,6 +20,7 @@ #define SPLIT_FLAG_MASK(n) (1 << n) #define SPLIT_FLAG_STABLE_SPLIT SPLIT_FLAG_MASK(0) +#define SPLIT_FLAG_INSERT_SPLIT SPLIT_FLAG_MASK(1) #define SPLIT_FLAG_SET_MASK(val, mask) (val) |= (mask) #define SPLIT_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0) @@ -1196,6 +1197,41 @@ static int32_t smaIndexSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { return code; } +typedef struct SInsertSelectSplitInfo { + SLogicNode* pQueryRoot; + SLogicSubplan* pSubplan; +} SInsertSelectSplitInfo; + +static bool insSelSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode, + SInsertSelectSplitInfo* pInfo) { + if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY == nodeType(pNode) && 1 == LIST_LENGTH(pNode->pChildren) && + MODIFY_TABLE_TYPE_INSERT == ((SVnodeModifyLogicNode*)pNode)->modifyType) { + pInfo->pQueryRoot = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0); + pInfo->pSubplan = pSubplan; + return true; + } + return false; +} + +static int32_t insertSelectSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { + SInsertSelectSplitInfo info = {0}; + if (!splMatch(pCxt, pSubplan, SPLIT_FLAG_INSERT_SPLIT, (FSplFindSplitNode)insSelSplFindSplitNode, &info)) { + return TSDB_CODE_SUCCESS; + } + + int32_t code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pQueryRoot, info.pSubplan->subplanType); + if (TSDB_CODE_SUCCESS == code) { + code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, info.pQueryRoot, 0)); + } + if (TSDB_CODE_SUCCESS == code) { + info.pSubplan->subplanType = SUBPLAN_TYPE_MODIFY; + SPLIT_FLAG_SET_MASK(info.pSubplan->splitFlag, SPLIT_FLAG_INSERT_SPLIT); + } + ++(pCxt->groupId); + pCxt->split = true; + return code; +} + typedef struct SQnodeSplitInfo { SLogicNode* pSplitNode; SLogicSubplan* pSubplan; @@ -1249,7 +1285,8 @@ static const SSplitRule splitRuleSet[] = { {.pName = "SingleTableJoinSplit", .splitFunc = singleTableJoinSplit}, {.pName = "UnionAllSplit", .splitFunc = unionAllSplit}, {.pName = "UnionDistinctSplit", .splitFunc = unionDistinctSplit}, - {.pName = "SmaIndexSplit", .splitFunc = smaIndexSplit} + {.pName = "SmaIndexSplit", .splitFunc = smaIndexSplit}, + {.pName = "InsertSelectSplit", .splitFunc = insertSelectSplit} }; // clang-format on @@ -1258,7 +1295,11 @@ static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule)); static void dumpLogicSubplan(const char* pRuleName, SLogicSubplan* pSubplan) { char* pStr = NULL; nodesNodeToString((SNode*)pSubplan, false, &pStr, NULL); - qDebugL("apply split %s rule: %s", pRuleName, pStr); + if (NULL == pRuleName) { + qDebugL("before split: %s", pStr); + } else { + qDebugL("apply split %s rule: %s", pRuleName, pStr); + } taosMemoryFree(pStr); } @@ -1266,6 +1307,7 @@ static int32_t applySplitRule(SPlanContext* pCxt, SLogicSubplan* pSubplan) { SSplitContext cxt = { .pPlanCxt = pCxt, .queryId = pSubplan->id.queryId, .groupId = pSubplan->id.groupId + 1, .split = false}; bool split = false; + dumpLogicSubplan(NULL, pSubplan); do { split = false; for (int32_t i = 0; i < splitRuleNum; ++i) { @@ -1293,8 +1335,16 @@ static void setVgroupsInfo(SLogicNode* pNode, SLogicSubplan* pSubplan) { FOREACH(pChild, pNode->pChildren) { setVgroupsInfo((SLogicNode*)pChild, pSubplan); } } +static bool needSplitSubplan(SLogicSubplan* pLogicSubplan) { + if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY != nodeType(pLogicSubplan->pNode)) { + return true; + } + SVnodeModifyLogicNode* pModify = (SVnodeModifyLogicNode*)pLogicSubplan->pNode; + return (MODIFY_TABLE_TYPE_INSERT == pModify->modifyType && NULL != pModify->node.pChildren); +} + int32_t splitLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) { - if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY == nodeType(pLogicSubplan->pNode)) { + if (!needSplitSubplan(pLogicSubplan)) { setVgroupsInfo(pLogicSubplan->pNode, pLogicSubplan); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/planner/test/planOtherTest.cpp b/source/libs/planner/test/planOtherTest.cpp index 10e792cdc5..7fd38cc5c8 100644 --- a/source/libs/planner/test/planOtherTest.cpp +++ b/source/libs/planner/test/planOtherTest.cpp @@ -95,5 +95,5 @@ TEST_F(PlanOtherTest, delete) { TEST_F(PlanOtherTest, insert) { useDb("root", "test"); - // run("INSERT INTO t1 SELECT * FROM t1"); + run("INSERT INTO t1 SELECT * FROM t1"); } diff --git a/tests/system-test/1-insert/test_stmt_muti_insert_query.py b/tests/system-test/1-insert/test_stmt_muti_insert_query.py index de10b2f6b9..9fb802b96b 100644 --- a/tests/system-test/1-insert/test_stmt_muti_insert_query.py +++ b/tests/system-test/1-insert/test_stmt_muti_insert_query.py @@ -96,7 +96,7 @@ class TDTestCase: ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)", ) # conn.load_table_info("log") - + tdLog.debug("statement start") start = datetime.now() stmt = conn.statement("insert into stb1 values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") @@ -118,8 +118,11 @@ class TDTestCase: params[14].nchar(["涛思数据", None, "a long string with 中文字符"]) params[15].timestamp([None, None, 1626861392591]) # print(type(stmt)) + tdLog.debug("bind_param_batch start") stmt.bind_param_batch(params) + tdLog.debug("bind_param_batch end") stmt.execute() + tdLog.debug("execute end") end = datetime.now() print("elapsed time: ", end - start) assert stmt.affected_rows == 3 @@ -155,7 +158,7 @@ class TDTestCase: print(rows1) assert str(rows1[0][0]) == "2021-07-21 17:56:32.589000" assert rows1[0][10] == 3 - + tdLog.debug("close start") stmt.close() -- GitLab