提交 c8508248 编写于 作者: X Xiaoyu Wang

feat: sql command 'insert ... select'

上级 cf9a6dea
...@@ -204,65 +204,65 @@ ...@@ -204,65 +204,65 @@
#define TK_SPLIT 186 #define TK_SPLIT 186
#define TK_SYNCDB 187 #define TK_SYNCDB 187
#define TK_DELETE 188 #define TK_DELETE 188
#define TK_NULL 189 #define TK_INSERT 189
#define TK_NK_QUESTION 190 #define TK_NULL 190
#define TK_NK_ARROW 191 #define TK_NK_QUESTION 191
#define TK_ROWTS 192 #define TK_NK_ARROW 192
#define TK_TBNAME 193 #define TK_ROWTS 193
#define TK_QSTARTTS 194 #define TK_TBNAME 194
#define TK_QENDTS 195 #define TK_QSTARTTS 195
#define TK_WSTARTTS 196 #define TK_QENDTS 196
#define TK_WENDTS 197 #define TK_WSTARTTS 197
#define TK_WDURATION 198 #define TK_WENDTS 198
#define TK_CAST 199 #define TK_WDURATION 199
#define TK_NOW 200 #define TK_CAST 200
#define TK_TODAY 201 #define TK_NOW 201
#define TK_TIMEZONE 202 #define TK_TODAY 202
#define TK_CLIENT_VERSION 203 #define TK_TIMEZONE 203
#define TK_SERVER_VERSION 204 #define TK_CLIENT_VERSION 204
#define TK_SERVER_STATUS 205 #define TK_SERVER_VERSION 205
#define TK_CURRENT_USER 206 #define TK_SERVER_STATUS 206
#define TK_COUNT 207 #define TK_CURRENT_USER 207
#define TK_LAST_ROW 208 #define TK_COUNT 208
#define TK_BETWEEN 209 #define TK_LAST_ROW 209
#define TK_IS 210 #define TK_BETWEEN 210
#define TK_NK_LT 211 #define TK_IS 211
#define TK_NK_GT 212 #define TK_NK_LT 212
#define TK_NK_LE 213 #define TK_NK_GT 213
#define TK_NK_GE 214 #define TK_NK_LE 214
#define TK_NK_NE 215 #define TK_NK_GE 215
#define TK_MATCH 216 #define TK_NK_NE 216
#define TK_NMATCH 217 #define TK_MATCH 217
#define TK_CONTAINS 218 #define TK_NMATCH 218
#define TK_JOIN 219 #define TK_CONTAINS 219
#define TK_INNER 220 #define TK_JOIN 220
#define TK_SELECT 221 #define TK_INNER 221
#define TK_DISTINCT 222 #define TK_SELECT 222
#define TK_WHERE 223 #define TK_DISTINCT 223
#define TK_PARTITION 224 #define TK_WHERE 224
#define TK_BY 225 #define TK_PARTITION 225
#define TK_SESSION 226 #define TK_BY 226
#define TK_STATE_WINDOW 227 #define TK_SESSION 227
#define TK_SLIDING 228 #define TK_STATE_WINDOW 228
#define TK_FILL 229 #define TK_SLIDING 229
#define TK_VALUE 230 #define TK_FILL 230
#define TK_NONE 231 #define TK_VALUE 231
#define TK_PREV 232 #define TK_NONE 232
#define TK_LINEAR 233 #define TK_PREV 233
#define TK_NEXT 234 #define TK_LINEAR 234
#define TK_HAVING 235 #define TK_NEXT 235
#define TK_RANGE 236 #define TK_HAVING 236
#define TK_EVERY 237 #define TK_RANGE 237
#define TK_ORDER 238 #define TK_EVERY 238
#define TK_SLIMIT 239 #define TK_ORDER 239
#define TK_SOFFSET 240 #define TK_SLIMIT 240
#define TK_LIMIT 241 #define TK_SOFFSET 241
#define TK_OFFSET 242 #define TK_LIMIT 242
#define TK_ASC 243 #define TK_OFFSET 243
#define TK_NULLS 244 #define TK_ASC 244
#define TK_ID 245 #define TK_NULLS 245
#define TK_NK_BITNOT 246 #define TK_ID 246
#define TK_INSERT 247 #define TK_NK_BITNOT 247
#define TK_VALUES 248 #define TK_VALUES 248
#define TK_IMPORT 249 #define TK_IMPORT 249
#define TK_NK_SEMI 250 #define TK_NK_SEMI 250
......
...@@ -194,6 +194,7 @@ typedef enum ENodeType { ...@@ -194,6 +194,7 @@ typedef enum ENodeType {
QUERY_NODE_KILL_QUERY_STMT, QUERY_NODE_KILL_QUERY_STMT,
QUERY_NODE_KILL_TRANSACTION_STMT, QUERY_NODE_KILL_TRANSACTION_STMT,
QUERY_NODE_DELETE_STMT, QUERY_NODE_DELETE_STMT,
QUERY_NODE_INSERT_STMT,
QUERY_NODE_QUERY, QUERY_NODE_QUERY,
// logic plan node // logic plan node
...@@ -247,6 +248,7 @@ typedef enum ENodeType { ...@@ -247,6 +248,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC, QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC,
QUERY_NODE_PHYSICAL_PLAN_DISPATCH, QUERY_NODE_PHYSICAL_PLAN_DISPATCH,
QUERY_NODE_PHYSICAL_PLAN_INSERT, QUERY_NODE_PHYSICAL_PLAN_INSERT,
QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT,
QUERY_NODE_PHYSICAL_PLAN_DELETE, QUERY_NODE_PHYSICAL_PLAN_DELETE,
QUERY_NODE_PHYSICAL_SUBPLAN, QUERY_NODE_PHYSICAL_SUBPLAN,
QUERY_NODE_PHYSICAL_PLAN QUERY_NODE_PHYSICAL_PLAN
......
...@@ -131,6 +131,7 @@ typedef struct SVnodeModifyLogicNode { ...@@ -131,6 +131,7 @@ typedef struct SVnodeModifyLogicNode {
int8_t tableType; // table type int8_t tableType; // table type
char tableFName[TSDB_TABLE_FNAME_LEN]; char tableFName[TSDB_TABLE_FNAME_LEN];
STimeWindow deleteTimeRange; STimeWindow deleteTimeRange;
SVgroupsInfo* pVgroupList;
} SVnodeModifyLogicNode; } SVnodeModifyLogicNode;
typedef struct SExchangeLogicNode { typedef struct SExchangeLogicNode {
...@@ -456,6 +457,15 @@ typedef struct SDataInserterNode { ...@@ -456,6 +457,15 @@ typedef struct SDataInserterNode {
char* pData; char* pData;
} SDataInserterNode; } SDataInserterNode;
typedef struct SQueryInserterNode {
SDataSinkNode sink;
uint64_t tableId;
int8_t tableType; // table type
char tableFName[TSDB_TABLE_FNAME_LEN];
int32_t vgId;
SEpSet epSet;
} SQueryInserterNode;
typedef struct SDataDeleterNode { typedef struct SDataDeleterNode {
SDataSinkNode sink; SDataSinkNode sink;
uint64_t tableId; uint64_t tableId;
......
...@@ -302,6 +302,14 @@ typedef struct SDeleteStmt { ...@@ -302,6 +302,14 @@ typedef struct SDeleteStmt {
bool deleteZeroRows; bool deleteZeroRows;
} SDeleteStmt; } SDeleteStmt;
typedef struct SInsertStmt {
ENodeType type; // QUERY_NODE_INSERT_STMT
SNode* pTable;
SNodeList* pCols;
SNode* pQuery;
uint8_t precision;
} SInsertStmt;
typedef enum { typedef enum {
PAYLOAD_TYPE_KV = 0, PAYLOAD_TYPE_KV = 0,
PAYLOAD_TYPE_RAW = 1, PAYLOAD_TYPE_RAW = 1,
......
...@@ -56,7 +56,7 @@ typedef struct SParseContext { ...@@ -56,7 +56,7 @@ typedef struct SParseContext {
} SParseContext; } SParseContext;
int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery); int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery);
bool qIsInsertSql(const char* pStr, size_t length); bool qIsInsertValuesSql(const char* pStr, size_t length);
// for async mode // for async mode
int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq); int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq);
......
...@@ -324,9 +324,9 @@ int32_t stmtCleanSQLInfo(STscStmt* pStmt) { ...@@ -324,9 +324,9 @@ int32_t stmtCleanSQLInfo(STscStmt* pStmt) {
} }
int32_t stmtRebuildDataBlock(STscStmt* pStmt, STableDataBlocks* pDataBlock, STableDataBlocks** newBlock, uint64_t uid) { int32_t stmtRebuildDataBlock(STscStmt* pStmt, STableDataBlocks* pDataBlock, STableDataBlocks** newBlock, uint64_t uid) {
SEpSet ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp); SEpSet ep = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
SVgroupInfo vgInfo = {0}; SVgroupInfo vgInfo = {0};
SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter, SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
.requestId = pStmt->exec.pRequest->requestId, .requestId = pStmt->exec.pRequest->requestId,
.requestObjRefId = pStmt->exec.pRequest->self, .requestObjRefId = pStmt->exec.pRequest->self,
.mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)}; .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
...@@ -391,13 +391,12 @@ int32_t stmtGetFromCache(STscStmt* pStmt) { ...@@ -391,13 +391,12 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
STMT_RET(stmtCleanBindInfo(pStmt)); STMT_RET(stmtCleanBindInfo(pStmt));
} }
STableMeta* pTableMeta = NULL; STableMeta* pTableMeta = NULL;
SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter, SRequestConnInfo conn = {.pTrans = pStmt->taos->pAppInfo->pTransporter,
.requestId = pStmt->exec.pRequest->requestId, .requestId = pStmt->exec.pRequest->requestId,
.requestObjRefId = pStmt->exec.pRequest->self, .requestObjRefId = pStmt->exec.pRequest->self,
.mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)}; .mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
int32_t code = int32_t code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta);
catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta);
if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) { if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
STMT_ERR_RET(stmtCleanBindInfo(pStmt)); STMT_ERR_RET(stmtCleanBindInfo(pStmt));
...@@ -849,7 +848,7 @@ int stmtIsInsert(TAOS_STMT* stmt, int* insert) { ...@@ -849,7 +848,7 @@ int stmtIsInsert(TAOS_STMT* stmt, int* insert) {
if (pStmt->sql.type) { if (pStmt->sql.type) {
*insert = (STMT_TYPE_INSERT == pStmt->sql.type || STMT_TYPE_MULTI_INSERT == pStmt->sql.type); *insert = (STMT_TYPE_INSERT == pStmt->sql.type || STMT_TYPE_MULTI_INSERT == pStmt->sql.type);
} else { } else {
*insert = qIsInsertSql(pStmt->sql.sqlStr, 0); *insert = qIsInsertValuesSql(pStmt->sql.sqlStr, 0);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -861,7 +860,7 @@ int stmtGetTagFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) { ...@@ -861,7 +860,7 @@ int stmtGetTagFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
if (STMT_TYPE_QUERY == pStmt->sql.type) { if (STMT_TYPE_QUERY == pStmt->sql.type) {
STMT_RET(TSDB_CODE_TSC_STMT_API_ERROR); STMT_RET(TSDB_CODE_TSC_STMT_API_ERROR);
} }
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS)); STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 && if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
...@@ -893,7 +892,7 @@ int stmtGetColFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) { ...@@ -893,7 +892,7 @@ int stmtGetColFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
if (STMT_TYPE_QUERY == pStmt->sql.type) { if (STMT_TYPE_QUERY == pStmt->sql.type) {
STMT_RET(TSDB_CODE_TSC_STMT_API_ERROR); STMT_RET(TSDB_CODE_TSC_STMT_API_ERROR);
} }
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS)); STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 && if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
...@@ -919,7 +918,6 @@ int stmtGetColFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) { ...@@ -919,7 +918,6 @@ int stmtGetColFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int stmtGetParamNum(TAOS_STMT* stmt, int* nums) { int stmtGetParamNum(TAOS_STMT* stmt, int* nums) {
STscStmt* pStmt = (STscStmt*)stmt; STscStmt* pStmt = (STscStmt*)stmt;
...@@ -952,13 +950,13 @@ int stmtGetParamNum(TAOS_STMT* stmt, int* nums) { ...@@ -952,13 +950,13 @@ int stmtGetParamNum(TAOS_STMT* stmt, int* nums) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int stmtGetParam(TAOS_STMT *stmt, int idx, int *type, int *bytes) { int stmtGetParam(TAOS_STMT* stmt, int idx, int* type, int* bytes) {
STscStmt* pStmt = (STscStmt*)stmt; STscStmt* pStmt = (STscStmt*)stmt;
if (STMT_TYPE_QUERY == pStmt->sql.type) { if (STMT_TYPE_QUERY == pStmt->sql.type) {
STMT_RET(TSDB_CODE_TSC_STMT_API_ERROR); STMT_RET(TSDB_CODE_TSC_STMT_API_ERROR);
} }
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS)); STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 && if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
...@@ -979,8 +977,8 @@ int stmtGetParam(TAOS_STMT *stmt, int idx, int *type, int *bytes) { ...@@ -979,8 +977,8 @@ int stmtGetParam(TAOS_STMT *stmt, int idx, int *type, int *bytes) {
STMT_ERR_RET(stmtParseSql(pStmt)); STMT_ERR_RET(stmtParseSql(pStmt));
} }
int32_t nums = 0; int32_t nums = 0;
TAOS_FIELD_E *pField = NULL; TAOS_FIELD_E* pField = NULL;
STMT_ERR_RET(stmtFetchColFields(stmt, &nums, &pField)); STMT_ERR_RET(stmtFetchColFields(stmt, &nums, &pField));
if (idx >= nums) { if (idx >= nums) {
tscError("idx %d is too big", idx); tscError("idx %d is too big", idx);
......
...@@ -19,8 +19,8 @@ ...@@ -19,8 +19,8 @@
#include "query.h" #include "query.h"
#include "querynodes.h" #include "querynodes.h"
#include "taoserror.h" #include "taoserror.h"
#include "tjson.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tjson.h"
static int32_t nodeToJson(const void* pObj, SJson* pJson); static int32_t nodeToJson(const void* pObj, SJson* pJson);
static int32_t jsonToNode(const SJson* pJson, void* pObj); static int32_t jsonToNode(const SJson* pJson, void* pObj);
...@@ -179,6 +179,8 @@ const char* nodesNodeName(ENodeType type) { ...@@ -179,6 +179,8 @@ const char* nodesNodeName(ENodeType type) {
return "ShowVnodeStmt"; return "ShowVnodeStmt";
case QUERY_NODE_DELETE_STMT: case QUERY_NODE_DELETE_STMT:
return "DeleteStmt"; return "DeleteStmt";
case QUERY_NODE_INSERT_STMT:
return "InsertStmt";
case QUERY_NODE_LOGIC_PLAN_SCAN: case QUERY_NODE_LOGIC_PLAN_SCAN:
return "LogicScan"; return "LogicScan";
case QUERY_NODE_LOGIC_PLAN_JOIN: case QUERY_NODE_LOGIC_PLAN_JOIN:
...@@ -2641,9 +2643,9 @@ static int32_t datumToJson(const void* pObj, SJson* pJson) { ...@@ -2641,9 +2643,9 @@ static int32_t datumToJson(const void* pObj, SJson* pJson) {
case TSDB_DATA_TYPE_VARBINARY: case TSDB_DATA_TYPE_VARBINARY:
code = tjsonAddStringToObject(pJson, jkValueDatum, varDataVal(pNode->datum.p)); code = tjsonAddStringToObject(pJson, jkValueDatum, varDataVal(pNode->datum.p));
break; break;
case TSDB_DATA_TYPE_JSON:{ case TSDB_DATA_TYPE_JSON: {
int32_t len = getJsonValueLen(pNode->datum.p); int32_t len = getJsonValueLen(pNode->datum.p);
char* buf = taosMemoryCalloc( len * 2 + 1, sizeof(char)); char* buf = taosMemoryCalloc(len * 2 + 1, sizeof(char));
code = taosHexEncode(pNode->datum.p, buf, len); code = taosHexEncode(pNode->datum.p, buf, len);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(buf); taosMemoryFree(buf);
...@@ -2775,7 +2777,7 @@ static int32_t jsonToDatum(const SJson* pJson, void* pObj) { ...@@ -2775,7 +2777,7 @@ static int32_t jsonToDatum(const SJson* pJson, void* pObj) {
} }
break; break;
} }
case TSDB_DATA_TYPE_JSON:{ case TSDB_DATA_TYPE_JSON: {
pNode->datum.p = taosMemoryCalloc(1, pNode->node.resType.bytes); pNode->datum.p = taosMemoryCalloc(1, pNode->node.resType.bytes);
if (NULL == pNode->datum.p) { if (NULL == pNode->datum.p) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
......
...@@ -229,6 +229,8 @@ SNode* nodesMakeNode(ENodeType type) { ...@@ -229,6 +229,8 @@ SNode* nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SKillStmt)); return makeNode(type, sizeof(SKillStmt));
case QUERY_NODE_DELETE_STMT: case QUERY_NODE_DELETE_STMT:
return makeNode(type, sizeof(SDeleteStmt)); return makeNode(type, sizeof(SDeleteStmt));
case QUERY_NODE_INSERT_STMT:
return makeNode(type, sizeof(SInsertStmt));
case QUERY_NODE_QUERY: case QUERY_NODE_QUERY:
return makeNode(type, sizeof(SQuery)); return makeNode(type, sizeof(SQuery));
case QUERY_NODE_LOGIC_PLAN_SCAN: case QUERY_NODE_LOGIC_PLAN_SCAN:
...@@ -690,6 +692,13 @@ void nodesDestroyNode(SNode* pNode) { ...@@ -690,6 +692,13 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode(pStmt->pTagCond); nodesDestroyNode(pStmt->pTagCond);
break; break;
} }
case QUERY_NODE_INSERT_STMT: {
SInsertStmt* pStmt = (SInsertStmt*)pNode;
nodesDestroyNode(pStmt->pTable);
nodesDestroyList(pStmt->pCols);
nodesDestroyNode(pStmt->pQuery);
break;
}
case QUERY_NODE_QUERY: { case QUERY_NODE_QUERY: {
SQuery* pQuery = (SQuery*)pNode; SQuery* pQuery = (SQuery*)pNode;
nodesDestroyNode(pQuery->pRoot); nodesDestroyNode(pQuery->pRoot);
...@@ -1524,7 +1533,6 @@ int32_t nodesCollectColumnsFromNode(SNode* node, const char* pTableAlias, EColle ...@@ -1524,7 +1533,6 @@ int32_t nodesCollectColumnsFromNode(SNode* node, const char* pTableAlias, EColle
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
typedef struct SCollectFuncsCxt { typedef struct SCollectFuncsCxt {
......
...@@ -210,6 +210,7 @@ SNode* createSyncdbStmt(SAstCreateContext* pCxt, const SToken* pDbName); ...@@ -210,6 +210,7 @@ SNode* createSyncdbStmt(SAstCreateContext* pCxt, const SToken* pDbName);
SNode* createGrantStmt(SAstCreateContext* pCxt, int64_t privileges, SToken* pDbName, SToken* pUserName); SNode* createGrantStmt(SAstCreateContext* pCxt, int64_t privileges, SToken* pDbName, SToken* pUserName);
SNode* createRevokeStmt(SAstCreateContext* pCxt, int64_t privileges, SToken* pDbName, SToken* pUserName); SNode* createRevokeStmt(SAstCreateContext* pCxt, int64_t privileges, SToken* pDbName, SToken* pUserName);
SNode* createDeleteStmt(SAstCreateContext* pCxt, SNode* pTable, SNode* pWhere); SNode* createDeleteStmt(SAstCreateContext* pCxt, SNode* pTable, SNode* pWhere);
SNode* createInsertStmt(SAstCreateContext* pCxt, SNode* pTable, SNodeList* pCols, SNode* pQuery);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -259,7 +259,7 @@ multi_create_clause(A) ::= multi_create_clause(B) create_subtable_clause(C). ...@@ -259,7 +259,7 @@ multi_create_clause(A) ::= multi_create_clause(B) create_subtable_clause(C).
create_subtable_clause(A) ::= create_subtable_clause(A) ::=
not_exists_opt(B) full_table_name(C) USING full_table_name(D) not_exists_opt(B) full_table_name(C) USING full_table_name(D)
specific_tags_opt(E) TAGS NK_LP expression_list(F) NK_RP table_options(G). { A = createCreateSubTableClause(pCxt, B, C, D, E, F, G); } specific_cols_opt(E) TAGS NK_LP expression_list(F) NK_RP table_options(G). { A = createCreateSubTableClause(pCxt, B, C, D, E, F, G); }
%type multi_drop_clause { SNodeList* } %type multi_drop_clause { SNodeList* }
%destructor multi_drop_clause { nodesDestroyList($$); } %destructor multi_drop_clause { nodesDestroyList($$); }
...@@ -268,10 +268,10 @@ multi_drop_clause(A) ::= multi_drop_clause(B) drop_table_clause(C). ...@@ -268,10 +268,10 @@ multi_drop_clause(A) ::= multi_drop_clause(B) drop_table_clause(C).
drop_table_clause(A) ::= exists_opt(B) full_table_name(C). { A = createDropTableClause(pCxt, B, C); } drop_table_clause(A) ::= exists_opt(B) full_table_name(C). { A = createDropTableClause(pCxt, B, C); }
%type specific_tags_opt { SNodeList* } %type specific_cols_opt { SNodeList* }
%destructor specific_tags_opt { nodesDestroyList($$); } %destructor specific_cols_opt { nodesDestroyList($$); }
specific_tags_opt(A) ::= . { A = NULL; } specific_cols_opt(A) ::= . { A = NULL; }
specific_tags_opt(A) ::= NK_LP col_name_list(B) NK_RP. { A = B; } specific_cols_opt(A) ::= NK_LP col_name_list(B) NK_RP. { A = B; }
full_table_name(A) ::= table_name(B). { A = createRealTableNode(pCxt, NULL, &B, NULL); } full_table_name(A) ::= table_name(B). { A = createRealTableNode(pCxt, NULL, &B, NULL); }
full_table_name(A) ::= db_name(B) NK_DOT table_name(C). { A = createRealTableNode(pCxt, &B, &C, NULL); } full_table_name(A) ::= db_name(B) NK_DOT table_name(C). { A = createRealTableNode(pCxt, &B, &C, NULL); }
...@@ -515,6 +515,9 @@ cmd ::= DELETE FROM full_table_name(A) where_clause_opt(B). ...@@ -515,6 +515,9 @@ cmd ::= DELETE FROM full_table_name(A) where_clause_opt(B).
/************************************************ select **************************************************************/ /************************************************ select **************************************************************/
cmd ::= query_expression(A). { pCxt->pRootNode = A; } cmd ::= query_expression(A). { pCxt->pRootNode = A; }
/************************************************ insert **************************************************************/
cmd ::= INSERT INTO full_table_name(A) specific_cols_opt(B) query_expression(C). { pCxt->pRootNode = createInsertStmt(pCxt, A, B, C); }
/************************************************ literal *************************************************************/ /************************************************ literal *************************************************************/
literal(A) ::= NK_INTEGER(B). { A = createRawExprNode(pCxt, &B, createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &B)); } literal(A) ::= NK_INTEGER(B). { A = createRawExprNode(pCxt, &B, createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &B)); }
literal(A) ::= NK_FLOAT(B). { A = createRawExprNode(pCxt, &B, createValueNode(pCxt, TSDB_DATA_TYPE_DOUBLE, &B)); } literal(A) ::= NK_FLOAT(B). { A = createRawExprNode(pCxt, &B, createValueNode(pCxt, TSDB_DATA_TYPE_DOUBLE, &B)); }
...@@ -973,4 +976,4 @@ null_ordering_opt(A) ::= . ...@@ -973,4 +976,4 @@ null_ordering_opt(A) ::= .
null_ordering_opt(A) ::= NULLS FIRST. { A = NULL_ORDER_FIRST; } null_ordering_opt(A) ::= NULLS FIRST. { A = NULL_ORDER_FIRST; }
null_ordering_opt(A) ::= NULLS LAST. { A = NULL_ORDER_LAST; } null_ordering_opt(A) ::= NULLS LAST. { A = NULL_ORDER_LAST; }
%fallback ID NK_BITNOT INSERT VALUES IMPORT NK_SEMI FILE. %fallback ID NK_BITNOT VALUES IMPORT NK_SEMI FILE.
...@@ -1662,3 +1662,13 @@ SNode* createDeleteStmt(SAstCreateContext* pCxt, SNode* pTable, SNode* pWhere) { ...@@ -1662,3 +1662,13 @@ SNode* createDeleteStmt(SAstCreateContext* pCxt, SNode* pTable, SNode* pWhere) {
} }
return (SNode*)pStmt; return (SNode*)pStmt;
} }
SNode* createInsertStmt(SAstCreateContext* pCxt, SNode* pTable, SNodeList* pCols, SNode* pQuery) {
CHECK_PARSER_STATUS(pCxt);
SInsertStmt* pStmt = (SInsertStmt*)nodesMakeNode(QUERY_NODE_INSERT_STMT);
CHECK_OUT_OF_MEM(pStmt);
pStmt->pTable = pTable;
pStmt->pCols = pCols;
pStmt->pQuery = pQuery;
return (SNode*)pStmt;
}
...@@ -447,6 +447,14 @@ static int32_t collectMetaKeyFromDelete(SCollectMetaKeyCxt* pCxt, SDeleteStmt* p ...@@ -447,6 +447,14 @@ static int32_t collectMetaKeyFromDelete(SCollectMetaKeyCxt* pCxt, SDeleteStmt* p
return collectMetaKeyFromRealTableImpl(pCxt, (SRealTableNode*)pStmt->pFromTable, AUTH_TYPE_WRITE); return collectMetaKeyFromRealTableImpl(pCxt, (SRealTableNode*)pStmt->pFromTable, AUTH_TYPE_WRITE);
} }
static int32_t collectMetaKeyFromInsert(SCollectMetaKeyCxt* pCxt, SInsertStmt* pStmt) {
int32_t code = collectMetaKeyFromRealTableImpl(pCxt, (SRealTableNode*)pStmt->pTable, AUTH_TYPE_WRITE);
if (TSDB_CODE_SUCCESS == code) {
code = collectMetaKeyFromQuery(pCxt, pStmt->pQuery);
}
return code;
}
static int32_t collectMetaKeyFromShowBlockDist(SCollectMetaKeyCxt* pCxt, SShowTableDistributedStmt* pStmt) { static int32_t collectMetaKeyFromShowBlockDist(SCollectMetaKeyCxt* pCxt, SShowTableDistributedStmt* pStmt) {
SName name = {.type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId}; SName name = {.type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId};
strcpy(name.dbname, pStmt->dbName); strcpy(name.dbname, pStmt->dbName);
...@@ -554,6 +562,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) { ...@@ -554,6 +562,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
return collectMetaKeyFromShowTransactions(pCxt, (SShowStmt*)pStmt); return collectMetaKeyFromShowTransactions(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_DELETE_STMT: case QUERY_NODE_DELETE_STMT:
return collectMetaKeyFromDelete(pCxt, (SDeleteStmt*)pStmt); return collectMetaKeyFromDelete(pCxt, (SDeleteStmt*)pStmt);
case QUERY_NODE_INSERT_STMT:
return collectMetaKeyFromInsert(pCxt, (SInsertStmt*)pStmt);
case QUERY_NODE_SHOW_TABLE_DISTRIBUTED_STMT: case QUERY_NODE_SHOW_TABLE_DISTRIBUTED_STMT:
return collectMetaKeyFromShowBlockDist(pCxt, (SShowTableDistributedStmt*)pStmt); return collectMetaKeyFromShowBlockDist(pCxt, (SShowTableDistributedStmt*)pStmt);
case QUERY_NODE_SHOW_SUBSCRIPTIONS_STMT: case QUERY_NODE_SHOW_SUBSCRIPTIONS_STMT:
......
...@@ -39,7 +39,7 @@ static int32_t checkAuth(SAuthCxt* pCxt, const char* pDbName, AUTH_TYPE type) { ...@@ -39,7 +39,7 @@ static int32_t checkAuth(SAuthCxt* pCxt, const char* pDbName, AUTH_TYPE type) {
if (NULL != pCxt->pMetaCache) { if (NULL != pCxt->pMetaCache) {
code = getUserAuthFromCache(pCxt->pMetaCache, pParseCxt->pUser, dbFname, type, &pass); code = getUserAuthFromCache(pCxt->pMetaCache, pParseCxt->pUser, dbFname, type, &pass);
} else { } else {
SRequestConnInfo conn = {.pTrans = pParseCxt->pTransporter, SRequestConnInfo conn = {.pTrans = pParseCxt->pTransporter,
.requestId = pParseCxt->requestId, .requestId = pParseCxt->requestId,
.requestObjRefId = pParseCxt->requestRid, .requestObjRefId = pParseCxt->requestRid,
.mgmtEps = pParseCxt->mgmtEpSet}; .mgmtEps = pParseCxt->mgmtEpSet};
...@@ -88,6 +88,14 @@ static int32_t authDelete(SAuthCxt* pCxt, SDeleteStmt* pDelete) { ...@@ -88,6 +88,14 @@ static int32_t authDelete(SAuthCxt* pCxt, SDeleteStmt* pDelete) {
return checkAuth(pCxt, ((SRealTableNode*)pDelete->pFromTable)->table.dbName, AUTH_TYPE_WRITE); return checkAuth(pCxt, ((SRealTableNode*)pDelete->pFromTable)->table.dbName, AUTH_TYPE_WRITE);
} }
static int32_t authInsert(SAuthCxt* pCxt, SInsertStmt* pInsert) {
int32_t code = checkAuth(pCxt, ((SRealTableNode*)pInsert->pTable)->table.dbName, AUTH_TYPE_WRITE);
if (TSDB_CODE_SUCCESS == code) {
code = authQuery(pCxt, pInsert->pQuery);
}
return code;
}
static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt) { static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt) {
switch (nodeType(pStmt)) { switch (nodeType(pStmt)) {
case QUERY_NODE_SET_OPERATOR: case QUERY_NODE_SET_OPERATOR:
...@@ -98,6 +106,8 @@ static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt) { ...@@ -98,6 +106,8 @@ static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt) {
return authDropUser(pCxt, (SDropUserStmt*)pStmt); return authDropUser(pCxt, (SDropUserStmt*)pStmt);
case QUERY_NODE_DELETE_STMT: case QUERY_NODE_DELETE_STMT:
return authDelete(pCxt, (SDeleteStmt*)pStmt); return authDelete(pCxt, (SDeleteStmt*)pStmt);
case QUERY_NODE_INSERT_STMT:
return authInsert(pCxt, (SInsertStmt*)pStmt);
default: default:
break; break;
} }
......
...@@ -300,6 +300,14 @@ static int32_t calcConstDelete(SCalcConstContext* pCxt, SDeleteStmt* pDelete) { ...@@ -300,6 +300,14 @@ static int32_t calcConstDelete(SCalcConstContext* pCxt, SDeleteStmt* pDelete) {
return code; return code;
} }
static int32_t calcConstInsert(SCalcConstContext* pCxt, SInsertStmt* pInsert) {
int32_t code = calcConstFromTable(pCxt, pInsert->pTable);
if (TSDB_CODE_SUCCESS == code) {
code = calcConstQuery(pCxt, pInsert->pQuery, false);
}
return code;
}
static int32_t calcConstQuery(SCalcConstContext* pCxt, SNode* pStmt, bool subquery) { static int32_t calcConstQuery(SCalcConstContext* pCxt, SNode* pStmt, bool subquery) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
switch (nodeType(pStmt)) { switch (nodeType(pStmt)) {
...@@ -320,6 +328,9 @@ static int32_t calcConstQuery(SCalcConstContext* pCxt, SNode* pStmt, bool subque ...@@ -320,6 +328,9 @@ static int32_t calcConstQuery(SCalcConstContext* pCxt, SNode* pStmt, bool subque
case QUERY_NODE_DELETE_STMT: case QUERY_NODE_DELETE_STMT:
code = calcConstDelete(pCxt, (SDeleteStmt*)pStmt); code = calcConstDelete(pCxt, (SDeleteStmt*)pStmt);
break; break;
case QUERY_NODE_INSERT_STMT:
code = calcConstInsert(pCxt, (SInsertStmt*)pStmt);
break;
default: default:
break; break;
} }
......
...@@ -2839,6 +2839,18 @@ static int32_t translateDelete(STranslateContext* pCxt, SDeleteStmt* pDelete) { ...@@ -2839,6 +2839,18 @@ static int32_t translateDelete(STranslateContext* pCxt, SDeleteStmt* pDelete) {
return code; return code;
} }
static int32_t translateInsert(STranslateContext* pCxt, SInsertStmt* pInsert) {
pCxt->pCurrStmt = (SNode*)pInsert;
int32_t code = translateFrom(pCxt, pInsert->pTable);
if (TSDB_CODE_SUCCESS == code) {
code = translateExprList(pCxt, pInsert->pCols);
}
if (TSDB_CODE_SUCCESS == code) {
code = translateQuery(pCxt, pInsert->pQuery);
}
return code;
}
static int64_t getUnitPerMinute(uint8_t precision) { static int64_t getUnitPerMinute(uint8_t precision) {
switch (precision) { switch (precision) {
case TSDB_TIME_PRECISION_MILLI: case TSDB_TIME_PRECISION_MILLI:
...@@ -4608,6 +4620,9 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) { ...@@ -4608,6 +4620,9 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
case QUERY_NODE_DELETE_STMT: case QUERY_NODE_DELETE_STMT:
code = translateDelete(pCxt, (SDeleteStmt*)pNode); code = translateDelete(pCxt, (SDeleteStmt*)pNode);
break; break;
case QUERY_NODE_INSERT_STMT:
code = translateInsert(pCxt, (SInsertStmt*)pNode);
break;
case QUERY_NODE_CREATE_DATABASE_STMT: case QUERY_NODE_CREATE_DATABASE_STMT:
code = translateCreateDatabase(pCxt, (SCreateDatabaseStmt*)pNode); code = translateCreateDatabase(pCxt, (SCreateDatabaseStmt*)pNode);
break; break;
...@@ -6224,6 +6239,10 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) { ...@@ -6224,6 +6239,10 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
pQuery->msgType = TDMT_VND_DELETE; pQuery->msgType = TDMT_VND_DELETE;
break; break;
case QUERY_NODE_INSERT_STMT:
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
pQuery->msgType = TDMT_VND_SUBMIT;
break;
case QUERY_NODE_VNODE_MODIF_STMT: case QUERY_NODE_VNODE_MODIF_STMT:
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
pQuery->msgType = toMsgType(((SVnodeModifOpStmt*)pQuery->pRoot)->sqlNodeType); pQuery->msgType = toMsgType(((SVnodeModifOpStmt*)pQuery->pRoot)->sqlNodeType);
......
...@@ -19,19 +19,27 @@ ...@@ -19,19 +19,27 @@
#include "parInt.h" #include "parInt.h"
#include "parToken.h" #include "parToken.h"
bool qIsInsertSql(const char* pStr, size_t length) { bool qIsInsertValuesSql(const char* pStr, size_t length) {
if (NULL == pStr) { if (NULL == pStr) {
return false; return false;
} }
const char* pSql = pStr;
int32_t index = 0; int32_t index = 0;
SToken t = tStrGetToken((char*)pStr, &index, false);
if (TK_INSERT != t.type && TK_IMPORT != t.type) {
return false;
}
do { do {
SToken t0 = tStrGetToken((char*)pStr, &index, false); pStr += index;
if (t0.type != TK_NK_LP) { t = tStrGetToken((char*)pStr, &index, false);
return t0.type == TK_INSERT || t0.type == TK_IMPORT; if (TK_USING == t.type || TK_VALUES == t.type) {
return true;
} }
} while (1); } while (pStr - pSql < length);
return false;
} }
static int32_t analyseSemantic(SParseContext* pCxt, SQuery* pQuery, SParseMetaCache* pMetaCache) { static int32_t analyseSemantic(SParseContext* pCxt, SQuery* pQuery, SParseMetaCache* pMetaCache) {
...@@ -148,7 +156,7 @@ static void rewriteExprAlias(SNode* pRoot) { ...@@ -148,7 +156,7 @@ static void rewriteExprAlias(SNode* pRoot) {
int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery) { int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (qIsInsertSql(pCxt->pSql, pCxt->sqlLen)) { if (qIsInsertValuesSql(pCxt->pSql, pCxt->sqlLen)) {
code = parseInsertSql(pCxt, pQuery, NULL); code = parseInsertSql(pCxt, pQuery, NULL);
} else { } else {
code = parseSqlIntoAst(pCxt, pQuery); code = parseSqlIntoAst(pCxt, pQuery);
...@@ -160,7 +168,7 @@ int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery) { ...@@ -160,7 +168,7 @@ int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery) {
int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq) { int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq) {
SParseMetaCache metaCache = {0}; SParseMetaCache metaCache = {0};
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (qIsInsertSql(pCxt->pSql, pCxt->sqlLen)) { if (qIsInsertValuesSql(pCxt->pSql, pCxt->sqlLen)) {
code = parseInsertSyntax(pCxt, pQuery, &metaCache); code = parseInsertSyntax(pCxt, pQuery, &metaCache);
} else { } else {
code = parseSqlSyntax(pCxt, pQuery, &metaCache); code = parseSqlSyntax(pCxt, pQuery, &metaCache);
......
因为 它太大了无法显示 source diff 。你可以改为 查看blob
...@@ -40,6 +40,12 @@ TEST_F(ParserExplainToSyncdbTest, grant) { ...@@ -40,6 +40,12 @@ TEST_F(ParserExplainToSyncdbTest, grant) {
run("GRANT READ, WRITE ON test.* TO wxy"); run("GRANT READ, WRITE ON test.* TO wxy");
} }
TEST_F(ParserExplainToSyncdbTest, insert) {
useDb("root", "test");
run("INSERT INTO t1 SELECT * FROM t1");
}
// todo kill connection // todo kill connection
// todo kill query // todo kill query
// todo kill stream // todo kill stream
......
...@@ -26,6 +26,7 @@ typedef int32_t (*FCreateLogicNode)(SLogicPlanContext*, void*, SLogicNode**); ...@@ -26,6 +26,7 @@ typedef int32_t (*FCreateLogicNode)(SLogicPlanContext*, void*, SLogicNode**);
typedef int32_t (*FCreateSelectLogicNode)(SLogicPlanContext*, SSelectStmt*, SLogicNode**); typedef int32_t (*FCreateSelectLogicNode)(SLogicPlanContext*, SSelectStmt*, SLogicNode**);
typedef int32_t (*FCreateSetOpLogicNode)(SLogicPlanContext*, SSetOperator*, SLogicNode**); typedef int32_t (*FCreateSetOpLogicNode)(SLogicPlanContext*, SSetOperator*, SLogicNode**);
typedef int32_t (*FCreateDeleteLogicNode)(SLogicPlanContext*, SDeleteStmt*, SLogicNode**); typedef int32_t (*FCreateDeleteLogicNode)(SLogicPlanContext*, SDeleteStmt*, SLogicNode**);
typedef int32_t (*FCreateInsertLogicNode)(SLogicPlanContext*, SInsertStmt*, SLogicNode**);
static int32_t doCreateLogicNodeByTable(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SNode* pTable, static int32_t doCreateLogicNodeByTable(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SNode* pTable,
SLogicNode** pLogicNode); SLogicNode** pLogicNode);
...@@ -1262,6 +1263,46 @@ static int32_t createDeleteLogicNode(SLogicPlanContext* pCxt, SDeleteStmt* pDele ...@@ -1262,6 +1263,46 @@ static int32_t createDeleteLogicNode(SLogicPlanContext* pCxt, SDeleteStmt* pDele
return code; return code;
} }
static int32_t creatInsertRootLogicNode(SLogicPlanContext* pCxt, SInsertStmt* pInsert, FCreateInsertLogicNode func,
SLogicNode** pRoot) {
return createRootLogicNode(pCxt, pInsert, pInsert->precision, (FCreateLogicNode)func, pRoot);
}
static int32_t createVnodeModifLogicNodeByInsert(SLogicPlanContext* pCxt, SInsertStmt* pInsert,
SLogicNode** pLogicNode) {
SVnodeModifyLogicNode* pModify = (SVnodeModifyLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY);
if (NULL == pModify) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SRealTableNode* pRealTable = (SRealTableNode*)pInsert->pTable;
pModify->modifyType = MODIFY_TABLE_TYPE_INSERT;
pModify->tableId = pRealTable->pMeta->uid;
pModify->tableType = pRealTable->pMeta->tableType;
snprintf(pModify->tableFName, sizeof(pModify->tableFName), "%d.%s.%s", pCxt->pPlanCxt->acctId,
pRealTable->table.dbName, pRealTable->table.tableName);
*pLogicNode = (SLogicNode*)pModify;
return TSDB_CODE_SUCCESS;
}
static int32_t createInsertLogicNode(SLogicPlanContext* pCxt, SInsertStmt* pInsert, SLogicNode** pLogicNode) {
SLogicNode* pRoot = NULL;
int32_t code = createQueryLogicNode(pCxt, pInsert->pQuery, &pRoot);
if (TSDB_CODE_SUCCESS == code) {
code = creatInsertRootLogicNode(pCxt, pInsert, createVnodeModifLogicNodeByInsert, &pRoot);
}
if (TSDB_CODE_SUCCESS == code) {
*pLogicNode = pRoot;
} else {
nodesDestroyNode((SNode*)pRoot);
}
return code;
}
static int32_t createQueryLogicNode(SLogicPlanContext* pCxt, SNode* pStmt, SLogicNode** pLogicNode) { static int32_t createQueryLogicNode(SLogicPlanContext* pCxt, SNode* pStmt, SLogicNode** pLogicNode) {
switch (nodeType(pStmt)) { switch (nodeType(pStmt)) {
case QUERY_NODE_SELECT_STMT: case QUERY_NODE_SELECT_STMT:
...@@ -1274,6 +1315,8 @@ static int32_t createQueryLogicNode(SLogicPlanContext* pCxt, SNode* pStmt, SLogi ...@@ -1274,6 +1315,8 @@ static int32_t createQueryLogicNode(SLogicPlanContext* pCxt, SNode* pStmt, SLogi
return createSetOperatorLogicNode(pCxt, (SSetOperator*)pStmt, pLogicNode); return createSetOperatorLogicNode(pCxt, (SSetOperator*)pStmt, pLogicNode);
case QUERY_NODE_DELETE_STMT: case QUERY_NODE_DELETE_STMT:
return createDeleteLogicNode(pCxt, (SDeleteStmt*)pStmt, pLogicNode); return createDeleteLogicNode(pCxt, (SDeleteStmt*)pStmt, pLogicNode);
case QUERY_NODE_INSERT_STMT:
return createInsertLogicNode(pCxt, (SInsertStmt*)pStmt, pLogicNode);
default: default:
break; break;
} }
......
...@@ -632,8 +632,8 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren ...@@ -632,8 +632,8 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
} }
if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pOnConditions) { if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pOnConditions) {
code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1, pJoinLogicNode->pOnConditions, code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1,
&pJoin->pOnConditions); pJoinLogicNode->pOnConditions, &pJoin->pOnConditions);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
...@@ -1496,12 +1496,58 @@ static SSubplan* makeSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubpl ...@@ -1496,12 +1496,58 @@ static SSubplan* makeSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubpl
return pSubplan; return pSubplan;
} }
static int32_t buildInsertSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) { static int32_t buildInsertValuesSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
pSubplan->msgType = pModify->msgType; pSubplan->msgType = pModify->msgType;
pSubplan->execNode.epSet = pModify->pVgDataBlocks->vg.epSet; pSubplan->execNode.epSet = pModify->pVgDataBlocks->vg.epSet;
return createDataInserter(pCxt, pModify->pVgDataBlocks, &pSubplan->pDataSink); return createDataInserter(pCxt, pModify->pVgDataBlocks, &pSubplan->pDataSink);
} }
static int32_t createQueryInserter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, const SPhysiNode* pRoot,
SDataSinkNode** pSink) {
SQueryInserterNode* pInserter = (SQueryInserterNode*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT);
if (NULL == pInserter) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pInserter->tableId = pModify->tableId;
pInserter->tableType = pModify->tableType;
strcpy(pInserter->tableFName, pModify->tableFName);
pInserter->vgId = pModify->pVgroupList->vgroups[0].vgId;
pInserter->epSet = pModify->pVgroupList->vgroups[0].epSet;
int32_t code = TSDB_CODE_SUCCESS;
pInserter->sink.pInputDataBlockDesc = (SDataBlockDescNode*)nodesCloneNode((SNode*)pRoot->pOutputDataBlockDesc);
if (NULL == pInserter->sink.pInputDataBlockDesc) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
if (TSDB_CODE_SUCCESS == code) {
*pSink = (SDataSinkNode*)pInserter;
} else {
nodesDestroyNode((SNode*)pInserter);
}
return TSDB_CODE_SUCCESS;
}
static int32_t buildInsertSelectSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
int32_t code =
createPhysiNode(pCxt, (SLogicNode*)nodesListGetNode(pModify->node.pChildren, 0), pSubplan, &pSubplan->pNode);
if (TSDB_CODE_SUCCESS == code) {
code = createQueryInserter(pCxt, pModify, pSubplan->pNode, &pSubplan->pDataSink);
}
pSubplan->msgType = TDMT_VND_SUBMIT;
return code;
}
static int32_t buildInsertSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
if (NULL == pModify->node.pChildren) {
return buildInsertValuesSubplan(pCxt, pModify, pSubplan);
}
return buildInsertSelectSubplan(pCxt, pModify, pSubplan);
}
static int32_t createDataDeleter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, const SPhysiNode* pRoot, static int32_t createDataDeleter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, const SPhysiNode* pRoot,
SDataSinkNode** pSink) { SDataSinkNode** pSink) {
SDataDeleterNode* pDeleter = (SDataDeleterNode*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_DELETE); SDataDeleterNode* pDeleter = (SDataDeleterNode*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_DELETE);
......
...@@ -82,29 +82,41 @@ static int32_t scaleOutByVgroups(SScaleOutContext* pCxt, SLogicSubplan* pSubplan ...@@ -82,29 +82,41 @@ static int32_t scaleOutByVgroups(SScaleOutContext* pCxt, SLogicSubplan* pSubplan
return code; return code;
} }
static int32_t scaleOutForModify(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) { static int32_t scaleOutForMerge(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
return nodesListStrictAppend(pGroup, (SNode*)singleCloneSubLogicPlan(pCxt, pSubplan, level));
}
static int32_t scaleOutForInsertValues(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level,
SNodeList* pGroup) {
SVnodeModifyLogicNode* pNode = (SVnodeModifyLogicNode*)pSubplan->pNode; SVnodeModifyLogicNode* pNode = (SVnodeModifyLogicNode*)pSubplan->pNode;
if (MODIFY_TABLE_TYPE_DELETE == pNode->modifyType) { size_t numOfVgroups = taosArrayGetSize(pNode->pDataBlocks);
return scaleOutByVgroups(pCxt, pSubplan, level, pGroup); for (int32_t i = 0; i < numOfVgroups; ++i) {
} else { SLogicSubplan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level);
size_t numOfVgroups = taosArrayGetSize(pNode->pDataBlocks); if (NULL == pNewSubplan) {
for (int32_t i = 0; i < numOfVgroups; ++i) { return TSDB_CODE_OUT_OF_MEMORY;
SLogicSubplan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level); }
if (NULL == pNewSubplan) { ((SVnodeModifyLogicNode*)pNewSubplan->pNode)->pVgDataBlocks = (SVgDataBlocks*)taosArrayGetP(pNode->pDataBlocks, i);
return TSDB_CODE_OUT_OF_MEMORY; if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pGroup, (SNode*)pNewSubplan)) {
} return TSDB_CODE_OUT_OF_MEMORY;
((SVnodeModifyLogicNode*)pNewSubplan->pNode)->pVgDataBlocks =
(SVgDataBlocks*)taosArrayGetP(pNode->pDataBlocks, i);
if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pGroup, (SNode*)pNewSubplan)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
} }
return TSDB_CODE_SUCCESS;
} }
return TSDB_CODE_SUCCESS;
} }
static int32_t scaleOutForMerge(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) { static int32_t scaleOutForInsert(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
return nodesListStrictAppend(pGroup, (SNode*)singleCloneSubLogicPlan(pCxt, pSubplan, level)); SVnodeModifyLogicNode* pNode = (SVnodeModifyLogicNode*)pSubplan->pNode;
if (NULL == pNode->node.pChildren) {
return scaleOutForInsertValues(pCxt, pSubplan, level, pGroup);
}
return scaleOutForMerge(pCxt, pSubplan, level, pGroup);
}
static int32_t scaleOutForModify(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
SVnodeModifyLogicNode* pNode = (SVnodeModifyLogicNode*)pSubplan->pNode;
if (MODIFY_TABLE_TYPE_DELETE == pNode->modifyType) {
return scaleOutByVgroups(pCxt, pSubplan, level, pGroup);
}
return scaleOutForInsert(pCxt, pSubplan, level, pGroup);
} }
static int32_t scaleOutForScan(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) { static int32_t scaleOutForScan(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
......
...@@ -91,3 +91,9 @@ TEST_F(PlanOtherTest, delete) { ...@@ -91,3 +91,9 @@ TEST_F(PlanOtherTest, delete) {
run("DELETE FROM st1 WHERE ts > now - 2d and ts < now - 1d AND tag1 = 10"); run("DELETE FROM st1 WHERE ts > now - 2d and ts < now - 1d AND tag1 = 10");
} }
TEST_F(PlanOtherTest, insert) {
useDb("root", "test");
// run("INSERT INTO t1 SELECT * FROM t1");
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册