提交 a67eb1a9 编写于 作者: X Xiaoyu Wang

eat: sql command 'insert ... select'

上级 331dca1f
...@@ -848,7 +848,7 @@ int stmtIsInsert(TAOS_STMT* stmt, int* insert) { ...@@ -848,7 +848,7 @@ int stmtIsInsert(TAOS_STMT* stmt, int* insert) {
if (pStmt->sql.type) { if (pStmt->sql.type) {
*insert = (STMT_TYPE_INSERT == pStmt->sql.type || STMT_TYPE_MULTI_INSERT == pStmt->sql.type); *insert = (STMT_TYPE_INSERT == pStmt->sql.type || STMT_TYPE_MULTI_INSERT == pStmt->sql.type);
} else { } else {
*insert = qIsInsertValuesSql(pStmt->sql.sqlStr, 0); *insert = qIsInsertValuesSql(pStmt->sql.sqlStr, pStmt->sql.sqlLen);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -165,7 +165,7 @@ static int32_t valueNodeCopy(const SValueNode* pSrc, SValueNode* pDst) { ...@@ -165,7 +165,7 @@ static int32_t valueNodeCopy(const SValueNode* pSrc, SValueNode* pDst) {
memcpy(pDst->datum.p, pSrc->datum.p, len); memcpy(pDst->datum.p, pSrc->datum.p, len);
break; break;
} }
case TSDB_DATA_TYPE_JSON:{ case TSDB_DATA_TYPE_JSON: {
int32_t len = getJsonValueLen(pSrc->datum.p); int32_t len = getJsonValueLen(pSrc->datum.p);
pDst->datum.p = taosMemoryCalloc(1, len); pDst->datum.p = taosMemoryCalloc(1, len);
if (NULL == pDst->datum.p) { if (NULL == pDst->datum.p) {
...@@ -397,6 +397,7 @@ static int32_t logicVnodeModifCopy(const SVnodeModifyLogicNode* pSrc, SVnodeModi ...@@ -397,6 +397,7 @@ static int32_t logicVnodeModifCopy(const SVnodeModifyLogicNode* pSrc, SVnodeModi
COPY_SCALAR_FIELD(tableType); COPY_SCALAR_FIELD(tableType);
COPY_CHAR_ARRAY_FIELD(tableFName); COPY_CHAR_ARRAY_FIELD(tableFName);
COPY_OBJECT_FIELD(deleteTimeRange, sizeof(STimeWindow)); COPY_OBJECT_FIELD(deleteTimeRange, sizeof(STimeWindow));
CLONE_OBJECT_FIELD(pVgroupList, vgroupsInfoClone);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -273,6 +273,8 @@ const char* nodesNodeName(ENodeType type) { ...@@ -273,6 +273,8 @@ const char* nodesNodeName(ENodeType type) {
return "PhysiDispatch"; return "PhysiDispatch";
case QUERY_NODE_PHYSICAL_PLAN_INSERT: case QUERY_NODE_PHYSICAL_PLAN_INSERT:
return "PhysiInsert"; return "PhysiInsert";
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT:
return "PhysiQueryInsert";
case QUERY_NODE_PHYSICAL_PLAN_DELETE: case QUERY_NODE_PHYSICAL_PLAN_DELETE:
return "PhysiDelete"; return "PhysiDelete";
case QUERY_NODE_PHYSICAL_SUBPLAN: case QUERY_NODE_PHYSICAL_SUBPLAN:
...@@ -2212,6 +2214,58 @@ static int32_t physiDispatchNodeToJson(const void* pObj, SJson* pJson) { return ...@@ -2212,6 +2214,58 @@ static int32_t physiDispatchNodeToJson(const void* pObj, SJson* pJson) { return
static int32_t jsonToPhysiDispatchNode(const SJson* pJson, void* pObj) { return jsonToPhysicDataSinkNode(pJson, pObj); } static int32_t jsonToPhysiDispatchNode(const SJson* pJson, void* pObj) { return jsonToPhysicDataSinkNode(pJson, pObj); }
static const char* jkQueryInsertPhysiPlanTableId = "TableId";
static const char* jkQueryInsertPhysiPlanTableType = "TableType";
static const char* jkQueryInsertPhysiPlanTableFName = "TableFName";
static const char* jkQueryInsertPhysiPlanVgId = "VgId";
static const char* jkQueryInsertPhysiPlanEpSet = "EpSet";
static int32_t physiQueryInsertNodeToJson(const void* pObj, SJson* pJson) {
const SQueryInserterNode* pNode = (const SQueryInserterNode*)pObj;
int32_t code = physicDataSinkNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkQueryInsertPhysiPlanTableId, pNode->tableId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkQueryInsertPhysiPlanTableType, pNode->tableType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddStringToObject(pJson, jkQueryInsertPhysiPlanTableFName, pNode->tableFName);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkQueryInsertPhysiPlanVgId, pNode->vgId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkQueryInsertPhysiPlanEpSet, epSetToJson, &pNode->epSet);
}
return code;
}
static int32_t jsonToPhysiQueryInsertNode(const SJson* pJson, void* pObj) {
SQueryInserterNode* pNode = (SQueryInserterNode*)pObj;
int32_t code = jsonToPhysicDataSinkNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetUBigIntValue(pJson, jkQueryInsertPhysiPlanTableId, &pNode->tableId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkQueryInsertPhysiPlanTableType, &pNode->tableType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetStringValue(pJson, jkQueryInsertPhysiPlanTableFName, pNode->tableFName);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkQueryInsertPhysiPlanVgId, &pNode->vgId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonToObject(pJson, jkQueryInsertPhysiPlanEpSet, jsonToEpSet, &pNode->epSet);
}
return code;
}
static const char* jkDeletePhysiPlanTableId = "TableId"; static const char* jkDeletePhysiPlanTableId = "TableId";
static const char* jkDeletePhysiPlanTableType = "TableType"; static const char* jkDeletePhysiPlanTableType = "TableType";
static const char* jkDeletePhysiPlanTableFName = "TableFName"; static const char* jkDeletePhysiPlanTableFName = "TableFName";
...@@ -4234,6 +4288,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { ...@@ -4234,6 +4288,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return physiDispatchNodeToJson(pObj, pJson); return physiDispatchNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_INSERT: case QUERY_NODE_PHYSICAL_PLAN_INSERT:
break; break;
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT:
return physiQueryInsertNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_DELETE: case QUERY_NODE_PHYSICAL_PLAN_DELETE:
return physiDeleteNodeToJson(pObj, pJson); return physiDeleteNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_SUBPLAN: case QUERY_NODE_PHYSICAL_SUBPLAN:
...@@ -4376,6 +4432,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { ...@@ -4376,6 +4432,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToPhysiInterpFuncNode(pJson, pObj); return jsonToPhysiInterpFuncNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
return jsonToPhysiDispatchNode(pJson, pObj); return jsonToPhysiDispatchNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT:
return jsonToPhysiQueryInsertNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_DELETE: case QUERY_NODE_PHYSICAL_PLAN_DELETE:
return jsonToPhysiDeleteNode(pJson, pObj); return jsonToPhysiDeleteNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_SUBPLAN: case QUERY_NODE_PHYSICAL_SUBPLAN:
......
...@@ -327,6 +327,8 @@ SNode* nodesMakeNode(ENodeType type) { ...@@ -327,6 +327,8 @@ SNode* nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SDataDispatcherNode)); return makeNode(type, sizeof(SDataDispatcherNode));
case QUERY_NODE_PHYSICAL_PLAN_INSERT: case QUERY_NODE_PHYSICAL_PLAN_INSERT:
return makeNode(type, sizeof(SDataInserterNode)); return makeNode(type, sizeof(SDataInserterNode));
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT:
return makeNode(type, sizeof(SQueryInserterNode));
case QUERY_NODE_PHYSICAL_PLAN_DELETE: case QUERY_NODE_PHYSICAL_PLAN_DELETE:
return makeNode(type, sizeof(SDataDeleterNode)); return makeNode(type, sizeof(SDataDeleterNode));
case QUERY_NODE_PHYSICAL_SUBPLAN: case QUERY_NODE_PHYSICAL_SUBPLAN:
...@@ -934,6 +936,11 @@ void nodesDestroyNode(SNode* pNode) { ...@@ -934,6 +936,11 @@ void nodesDestroyNode(SNode* pNode) {
taosMemoryFreeClear(pSink->pData); taosMemoryFreeClear(pSink->pData);
break; break;
} }
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
SQueryInserterNode* pSink = (SQueryInserterNode*)pNode;
destroyDataSinkNode((SDataSinkNode*)pSink);
break;
}
case QUERY_NODE_PHYSICAL_PLAN_DELETE: { case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
SDataDeleterNode* pSink = (SDataDeleterNode*)pNode; SDataDeleterNode* pSink = (SDataDeleterNode*)pNode;
destroyDataSinkNode((SDataSinkNode*)pSink); destroyDataSinkNode((SDataSinkNode*)pSink);
......
...@@ -2845,6 +2845,9 @@ static int32_t translateInsert(STranslateContext* pCxt, SInsertStmt* pInsert) { ...@@ -2845,6 +2845,9 @@ static int32_t translateInsert(STranslateContext* pCxt, SInsertStmt* pInsert) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = translateExprList(pCxt, pInsert->pCols); code = translateExprList(pCxt, pInsert->pCols);
} }
if (TSDB_CODE_SUCCESS == code) {
code = resetTranslateNamespace(pCxt);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = translateQuery(pCxt, pInsert->pQuery); code = translateQuery(pCxt, pInsert->pQuery);
} }
......
...@@ -1282,6 +1282,7 @@ static int32_t createVnodeModifLogicNodeByInsert(SLogicPlanContext* pCxt, SInser ...@@ -1282,6 +1282,7 @@ static int32_t createVnodeModifLogicNodeByInsert(SLogicPlanContext* pCxt, SInser
pModify->tableType = pRealTable->pMeta->tableType; pModify->tableType = pRealTable->pMeta->tableType;
snprintf(pModify->tableFName, sizeof(pModify->tableFName), "%d.%s.%s", pCxt->pPlanCxt->acctId, snprintf(pModify->tableFName, sizeof(pModify->tableFName), "%d.%s.%s", pCxt->pPlanCxt->acctId,
pRealTable->table.dbName, pRealTable->table.tableName); pRealTable->table.dbName, pRealTable->table.tableName);
TSWAP(pModify->pVgroupList, pRealTable->pVgroupList);
*pLogicNode = (SLogicNode*)pModify; *pLogicNode = (SLogicNode*)pModify;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -485,7 +485,7 @@ static int32_t pushDownCondOptPushCondToProject(SOptimizeContext* pCxt, SProject ...@@ -485,7 +485,7 @@ static int32_t pushDownCondOptPushCondToProject(SOptimizeContext* pCxt, SProject
return pushDownCondOptAppendCond(&pProject->node.pConditions, pCond); return pushDownCondOptAppendCond(&pProject->node.pConditions, pCond);
} }
static int32_t pushDownCondOptPushCondToJoin(SOptimizeContext* pCxt, SJoinLogicNode * pJoin, SNode** pCond) { static int32_t pushDownCondOptPushCondToJoin(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SNode** pCond) {
return pushDownCondOptAppendCond(&pJoin->node.pConditions, pCond); return pushDownCondOptAppendCond(&pJoin->node.pConditions, pCond);
} }
...@@ -821,7 +821,7 @@ static int32_t pushDownCondOptDealAgg(SOptimizeContext* pCxt, SAggLogicNode* pAg ...@@ -821,7 +821,7 @@ static int32_t pushDownCondOptDealAgg(SOptimizeContext* pCxt, SAggLogicNode* pAg
typedef struct SRewriteProjCondContext { typedef struct SRewriteProjCondContext {
SProjectLogicNode* pProj; SProjectLogicNode* pProj;
int32_t errCode; int32_t errCode;
}SRewriteProjCondContext; } SRewriteProjCondContext;
static EDealRes rewriteProjectCondForPushDownImpl(SNode** ppNode, void* pContext) { static EDealRes rewriteProjectCondForPushDownImpl(SNode** ppNode, void* pContext) {
SRewriteProjCondContext* pCxt = pContext; SRewriteProjCondContext* pCxt = pContext;
...@@ -849,7 +849,8 @@ static EDealRes rewriteProjectCondForPushDownImpl(SNode** ppNode, void* pContext ...@@ -849,7 +849,8 @@ static EDealRes rewriteProjectCondForPushDownImpl(SNode** ppNode, void* pContext
return DEAL_RES_CONTINUE; return DEAL_RES_CONTINUE;
} }
static int32_t rewriteProjectCondForPushDown(SOptimizeContext* pCxt, SProjectLogicNode* pProject, SNode** ppProjectCond) { static int32_t rewriteProjectCondForPushDown(SOptimizeContext* pCxt, SProjectLogicNode* pProject,
SNode** ppProjectCond) {
SRewriteProjCondContext cxt = {.pProj = pProject, .errCode = TSDB_CODE_SUCCESS}; SRewriteProjCondContext cxt = {.pProj = pProject, .errCode = TSDB_CODE_SUCCESS};
SNode* pProjectCond = pProject->node.pConditions; SNode* pProjectCond = pProject->node.pConditions;
nodesRewriteExpr(&pProjectCond, rewriteProjectCondForPushDownImpl, &cxt); nodesRewriteExpr(&pProjectCond, rewriteProjectCondForPushDownImpl, &cxt);
...@@ -2082,13 +2083,18 @@ static const int32_t optimizeRuleNum = (sizeof(optimizeRuleSet) / sizeof(SOptimi ...@@ -2082,13 +2083,18 @@ static const int32_t optimizeRuleNum = (sizeof(optimizeRuleSet) / sizeof(SOptimi
static void dumpLogicSubplan(const char* pRuleName, SLogicSubplan* pSubplan) { static void dumpLogicSubplan(const char* pRuleName, SLogicSubplan* pSubplan) {
char* pStr = NULL; char* pStr = NULL;
nodesNodeToString((SNode*)pSubplan, false, &pStr, NULL); nodesNodeToString((SNode*)pSubplan, false, &pStr, NULL);
if (NULL == pRuleName) {
qDebugL("before optimize: %s", pStr);
} else {
qDebugL("apply optimize %s rule: %s", pRuleName, pStr); qDebugL("apply optimize %s rule: %s", pRuleName, pStr);
}
taosMemoryFree(pStr); taosMemoryFree(pStr);
} }
static int32_t applyOptimizeRule(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) { static int32_t applyOptimizeRule(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) {
SOptimizeContext cxt = {.pPlanCxt = pCxt, .optimized = false}; SOptimizeContext cxt = {.pPlanCxt = pCxt, .optimized = false};
bool optimized = false; bool optimized = false;
dumpLogicSubplan(NULL, pLogicSubplan);
do { do {
optimized = false; optimized = false;
for (int32_t i = 0; i < optimizeRuleNum; ++i) { for (int32_t i = 0; i < optimizeRuleNum; ++i) {
......
...@@ -1502,7 +1502,7 @@ static int32_t buildInsertValuesSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLog ...@@ -1502,7 +1502,7 @@ static int32_t buildInsertValuesSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLog
return createDataInserter(pCxt, pModify->pVgDataBlocks, &pSubplan->pDataSink); return createDataInserter(pCxt, pModify->pVgDataBlocks, &pSubplan->pDataSink);
} }
static int32_t createQueryInserter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, const SPhysiNode* pRoot, static int32_t createQueryInserter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan,
SDataSinkNode** pSink) { SDataSinkNode** pSink) {
SQueryInserterNode* pInserter = (SQueryInserterNode*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT); SQueryInserterNode* pInserter = (SQueryInserterNode*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT);
if (NULL == pInserter) { if (NULL == pInserter) {
...@@ -1514,10 +1514,12 @@ static int32_t createQueryInserter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNod ...@@ -1514,10 +1514,12 @@ static int32_t createQueryInserter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNod
strcpy(pInserter->tableFName, pModify->tableFName); strcpy(pInserter->tableFName, pModify->tableFName);
pInserter->vgId = pModify->pVgroupList->vgroups[0].vgId; pInserter->vgId = pModify->pVgroupList->vgroups[0].vgId;
pInserter->epSet = pModify->pVgroupList->vgroups[0].epSet; pInserter->epSet = pModify->pVgroupList->vgroups[0].epSet;
vgroupInfoToNodeAddr(pModify->pVgroupList->vgroups, &pSubplan->execNode);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
pInserter->sink.pInputDataBlockDesc = (SDataBlockDescNode*)nodesCloneNode((SNode*)pRoot->pOutputDataBlockDesc); pInserter->sink.pInputDataBlockDesc =
(SDataBlockDescNode*)nodesCloneNode((SNode*)pSubplan->pNode->pOutputDataBlockDesc);
if (NULL == pInserter->sink.pInputDataBlockDesc) { if (NULL == pInserter->sink.pInputDataBlockDesc) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
} }
...@@ -1535,7 +1537,7 @@ static int32_t buildInsertSelectSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLog ...@@ -1535,7 +1537,7 @@ static int32_t buildInsertSelectSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLog
int32_t code = int32_t code =
createPhysiNode(pCxt, (SLogicNode*)nodesListGetNode(pModify->node.pChildren, 0), pSubplan, &pSubplan->pNode); createPhysiNode(pCxt, (SLogicNode*)nodesListGetNode(pModify->node.pChildren, 0), pSubplan, &pSubplan->pNode);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = createQueryInserter(pCxt, pModify, pSubplan->pNode, &pSubplan->pDataSink); code = createQueryInserter(pCxt, pModify, pSubplan, &pSubplan->pDataSink);
} }
pSubplan->msgType = TDMT_VND_SUBMIT; pSubplan->msgType = TDMT_VND_SUBMIT;
return code; return code;
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#define SPLIT_FLAG_MASK(n) (1 << n) #define SPLIT_FLAG_MASK(n) (1 << n)
#define SPLIT_FLAG_STABLE_SPLIT SPLIT_FLAG_MASK(0) #define SPLIT_FLAG_STABLE_SPLIT SPLIT_FLAG_MASK(0)
#define SPLIT_FLAG_INSERT_SPLIT SPLIT_FLAG_MASK(1)
#define SPLIT_FLAG_SET_MASK(val, mask) (val) |= (mask) #define SPLIT_FLAG_SET_MASK(val, mask) (val) |= (mask)
#define SPLIT_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0) #define SPLIT_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0)
...@@ -1196,6 +1197,41 @@ static int32_t smaIndexSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { ...@@ -1196,6 +1197,41 @@ static int32_t smaIndexSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
return code; return code;
} }
typedef struct SInsertSelectSplitInfo {
SLogicNode* pQueryRoot;
SLogicSubplan* pSubplan;
} SInsertSelectSplitInfo;
static bool insSelSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
SInsertSelectSplitInfo* pInfo) {
if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY == nodeType(pNode) && 1 == LIST_LENGTH(pNode->pChildren) &&
MODIFY_TABLE_TYPE_INSERT == ((SVnodeModifyLogicNode*)pNode)->modifyType) {
pInfo->pQueryRoot = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0);
pInfo->pSubplan = pSubplan;
return true;
}
return false;
}
static int32_t insertSelectSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
SInsertSelectSplitInfo info = {0};
if (!splMatch(pCxt, pSubplan, SPLIT_FLAG_INSERT_SPLIT, (FSplFindSplitNode)insSelSplFindSplitNode, &info)) {
return TSDB_CODE_SUCCESS;
}
int32_t code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pQueryRoot, info.pSubplan->subplanType);
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, info.pQueryRoot, 0));
}
if (TSDB_CODE_SUCCESS == code) {
info.pSubplan->subplanType = SUBPLAN_TYPE_MODIFY;
SPLIT_FLAG_SET_MASK(info.pSubplan->splitFlag, SPLIT_FLAG_INSERT_SPLIT);
}
++(pCxt->groupId);
pCxt->split = true;
return code;
}
typedef struct SQnodeSplitInfo { typedef struct SQnodeSplitInfo {
SLogicNode* pSplitNode; SLogicNode* pSplitNode;
SLogicSubplan* pSubplan; SLogicSubplan* pSubplan;
...@@ -1249,7 +1285,8 @@ static const SSplitRule splitRuleSet[] = { ...@@ -1249,7 +1285,8 @@ static const SSplitRule splitRuleSet[] = {
{.pName = "SingleTableJoinSplit", .splitFunc = singleTableJoinSplit}, {.pName = "SingleTableJoinSplit", .splitFunc = singleTableJoinSplit},
{.pName = "UnionAllSplit", .splitFunc = unionAllSplit}, {.pName = "UnionAllSplit", .splitFunc = unionAllSplit},
{.pName = "UnionDistinctSplit", .splitFunc = unionDistinctSplit}, {.pName = "UnionDistinctSplit", .splitFunc = unionDistinctSplit},
{.pName = "SmaIndexSplit", .splitFunc = smaIndexSplit} {.pName = "SmaIndexSplit", .splitFunc = smaIndexSplit},
{.pName = "InsertSelectSplit", .splitFunc = insertSelectSplit}
}; };
// clang-format on // clang-format on
...@@ -1258,7 +1295,11 @@ static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule)); ...@@ -1258,7 +1295,11 @@ static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule));
static void dumpLogicSubplan(const char* pRuleName, SLogicSubplan* pSubplan) { static void dumpLogicSubplan(const char* pRuleName, SLogicSubplan* pSubplan) {
char* pStr = NULL; char* pStr = NULL;
nodesNodeToString((SNode*)pSubplan, false, &pStr, NULL); nodesNodeToString((SNode*)pSubplan, false, &pStr, NULL);
if (NULL == pRuleName) {
qDebugL("before split: %s", pStr);
} else {
qDebugL("apply split %s rule: %s", pRuleName, pStr); qDebugL("apply split %s rule: %s", pRuleName, pStr);
}
taosMemoryFree(pStr); taosMemoryFree(pStr);
} }
...@@ -1266,6 +1307,7 @@ static int32_t applySplitRule(SPlanContext* pCxt, SLogicSubplan* pSubplan) { ...@@ -1266,6 +1307,7 @@ static int32_t applySplitRule(SPlanContext* pCxt, SLogicSubplan* pSubplan) {
SSplitContext cxt = { SSplitContext cxt = {
.pPlanCxt = pCxt, .queryId = pSubplan->id.queryId, .groupId = pSubplan->id.groupId + 1, .split = false}; .pPlanCxt = pCxt, .queryId = pSubplan->id.queryId, .groupId = pSubplan->id.groupId + 1, .split = false};
bool split = false; bool split = false;
dumpLogicSubplan(NULL, pSubplan);
do { do {
split = false; split = false;
for (int32_t i = 0; i < splitRuleNum; ++i) { for (int32_t i = 0; i < splitRuleNum; ++i) {
...@@ -1293,8 +1335,16 @@ static void setVgroupsInfo(SLogicNode* pNode, SLogicSubplan* pSubplan) { ...@@ -1293,8 +1335,16 @@ static void setVgroupsInfo(SLogicNode* pNode, SLogicSubplan* pSubplan) {
FOREACH(pChild, pNode->pChildren) { setVgroupsInfo((SLogicNode*)pChild, pSubplan); } FOREACH(pChild, pNode->pChildren) { setVgroupsInfo((SLogicNode*)pChild, pSubplan); }
} }
static bool needSplitSubplan(SLogicSubplan* pLogicSubplan) {
if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY != nodeType(pLogicSubplan->pNode)) {
return true;
}
SVnodeModifyLogicNode* pModify = (SVnodeModifyLogicNode*)pLogicSubplan->pNode;
return (MODIFY_TABLE_TYPE_INSERT == pModify->modifyType && NULL != pModify->node.pChildren);
}
int32_t splitLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) { int32_t splitLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) {
if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY == nodeType(pLogicSubplan->pNode)) { if (!needSplitSubplan(pLogicSubplan)) {
setVgroupsInfo(pLogicSubplan->pNode, pLogicSubplan); setVgroupsInfo(pLogicSubplan->pNode, pLogicSubplan);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -95,5 +95,5 @@ TEST_F(PlanOtherTest, delete) { ...@@ -95,5 +95,5 @@ TEST_F(PlanOtherTest, delete) {
TEST_F(PlanOtherTest, insert) { TEST_F(PlanOtherTest, insert) {
useDb("root", "test"); useDb("root", "test");
// run("INSERT INTO t1 SELECT * FROM t1"); run("INSERT INTO t1 SELECT * FROM t1");
} }
...@@ -96,7 +96,7 @@ class TDTestCase: ...@@ -96,7 +96,7 @@ class TDTestCase:
ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)", ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)",
) )
# conn.load_table_info("log") # conn.load_table_info("log")
tdLog.debug("statement start")
start = datetime.now() start = datetime.now()
stmt = conn.statement("insert into stb1 values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") stmt = conn.statement("insert into stb1 values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
...@@ -118,8 +118,11 @@ class TDTestCase: ...@@ -118,8 +118,11 @@ class TDTestCase:
params[14].nchar(["涛思数据", None, "a long string with 中文字符"]) params[14].nchar(["涛思数据", None, "a long string with 中文字符"])
params[15].timestamp([None, None, 1626861392591]) params[15].timestamp([None, None, 1626861392591])
# print(type(stmt)) # print(type(stmt))
tdLog.debug("bind_param_batch start")
stmt.bind_param_batch(params) stmt.bind_param_batch(params)
tdLog.debug("bind_param_batch end")
stmt.execute() stmt.execute()
tdLog.debug("execute end")
end = datetime.now() end = datetime.now()
print("elapsed time: ", end - start) print("elapsed time: ", end - start)
assert stmt.affected_rows == 3 assert stmt.affected_rows == 3
...@@ -155,7 +158,7 @@ class TDTestCase: ...@@ -155,7 +158,7 @@ class TDTestCase:
print(rows1) print(rows1)
assert str(rows1[0][0]) == "2021-07-21 17:56:32.589000" assert str(rows1[0][0]) == "2021-07-21 17:56:32.589000"
assert rows1[0][10] == 3 assert rows1[0][10] == 3
tdLog.debug("close start")
stmt.close() stmt.close()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册