提交 916dbebb 编写于 作者: H Haojun Liao

other: merge 3.0

......@@ -73,138 +73,138 @@
#define TK_MNODE 55
#define TK_DATABASE 56
#define TK_USE 57
#define TK_FLUSH 58
#define TK_IF 59
#define TK_NOT 60
#define TK_EXISTS 61
#define TK_BUFFER 62
#define TK_CACHELAST 63
#define TK_COMP 64
#define TK_DURATION 65
#define TK_NK_VARIABLE 66
#define TK_FSYNC 67
#define TK_MAXROWS 68
#define TK_MINROWS 69
#define TK_KEEP 70
#define TK_PAGES 71
#define TK_PAGESIZE 72
#define TK_PRECISION 73
#define TK_REPLICA 74
#define TK_STRICT 75
#define TK_WAL 76
#define TK_VGROUPS 77
#define TK_SINGLE_STABLE 78
#define TK_RETENTIONS 79
#define TK_SCHEMALESS 80
#define TK_NK_COLON 81
#define TK_TABLE 82
#define TK_NK_LP 83
#define TK_NK_RP 84
#define TK_STABLE 85
#define TK_ADD 86
#define TK_COLUMN 87
#define TK_MODIFY 88
#define TK_RENAME 89
#define TK_TAG 90
#define TK_SET 91
#define TK_NK_EQ 92
#define TK_USING 93
#define TK_TAGS 94
#define TK_COMMENT 95
#define TK_BOOL 96
#define TK_TINYINT 97
#define TK_SMALLINT 98
#define TK_INT 99
#define TK_INTEGER 100
#define TK_BIGINT 101
#define TK_FLOAT 102
#define TK_DOUBLE 103
#define TK_BINARY 104
#define TK_TIMESTAMP 105
#define TK_NCHAR 106
#define TK_UNSIGNED 107
#define TK_JSON 108
#define TK_VARCHAR 109
#define TK_MEDIUMBLOB 110
#define TK_BLOB 111
#define TK_VARBINARY 112
#define TK_DECIMAL 113
#define TK_MAX_DELAY 114
#define TK_WATERMARK 115
#define TK_ROLLUP 116
#define TK_TTL 117
#define TK_SMA 118
#define TK_FIRST 119
#define TK_LAST 120
#define TK_SHOW 121
#define TK_DATABASES 122
#define TK_TABLES 123
#define TK_STABLES 124
#define TK_MNODES 125
#define TK_MODULES 126
#define TK_QNODES 127
#define TK_FUNCTIONS 128
#define TK_INDEXES 129
#define TK_ACCOUNTS 130
#define TK_APPS 131
#define TK_CONNECTIONS 132
#define TK_LICENCE 133
#define TK_GRANTS 134
#define TK_QUERIES 135
#define TK_SCORES 136
#define TK_TOPICS 137
#define TK_VARIABLES 138
#define TK_BNODES 139
#define TK_SNODES 140
#define TK_CLUSTER 141
#define TK_TRANSACTIONS 142
#define TK_DISTRIBUTED 143
#define TK_CONSUMERS 144
#define TK_SUBSCRIPTIONS 145
#define TK_LIKE 146
#define TK_INDEX 147
#define TK_FUNCTION 148
#define TK_INTERVAL 149
#define TK_TOPIC 150
#define TK_AS 151
#define TK_WITH 152
#define TK_META 153
#define TK_CONSUMER 154
#define TK_GROUP 155
#define TK_DESC 156
#define TK_DESCRIBE 157
#define TK_RESET 158
#define TK_QUERY 159
#define TK_CACHE 160
#define TK_EXPLAIN 161
#define TK_ANALYZE 162
#define TK_VERBOSE 163
#define TK_NK_BOOL 164
#define TK_RATIO 165
#define TK_NK_FLOAT 166
#define TK_COMPACT 167
#define TK_VNODES 168
#define TK_IN 169
#define TK_OUTPUTTYPE 170
#define TK_AGGREGATE 171
#define TK_BUFSIZE 172
#define TK_STREAM 173
#define TK_INTO 174
#define TK_TRIGGER 175
#define TK_AT_ONCE 176
#define TK_WINDOW_CLOSE 177
#define TK_IGNORE 178
#define TK_EXPIRED 179
#define TK_KILL 180
#define TK_CONNECTION 181
#define TK_TRANSACTION 182
#define TK_BALANCE 183
#define TK_VGROUP 184
#define TK_MERGE 185
#define TK_REDISTRIBUTE 186
#define TK_SPLIT 187
#define TK_SYNCDB 188
#define TK_DELETE 189
#define TK_IF 58
#define TK_NOT 59
#define TK_EXISTS 60
#define TK_BUFFER 61
#define TK_CACHELAST 62
#define TK_COMP 63
#define TK_DURATION 64
#define TK_NK_VARIABLE 65
#define TK_FSYNC 66
#define TK_MAXROWS 67
#define TK_MINROWS 68
#define TK_KEEP 69
#define TK_PAGES 70
#define TK_PAGESIZE 71
#define TK_PRECISION 72
#define TK_REPLICA 73
#define TK_STRICT 74
#define TK_WAL 75
#define TK_VGROUPS 76
#define TK_SINGLE_STABLE 77
#define TK_RETENTIONS 78
#define TK_SCHEMALESS 79
#define TK_NK_COLON 80
#define TK_TABLE 81
#define TK_NK_LP 82
#define TK_NK_RP 83
#define TK_STABLE 84
#define TK_ADD 85
#define TK_COLUMN 86
#define TK_MODIFY 87
#define TK_RENAME 88
#define TK_TAG 89
#define TK_SET 90
#define TK_NK_EQ 91
#define TK_USING 92
#define TK_TAGS 93
#define TK_COMMENT 94
#define TK_BOOL 95
#define TK_TINYINT 96
#define TK_SMALLINT 97
#define TK_INT 98
#define TK_INTEGER 99
#define TK_BIGINT 100
#define TK_FLOAT 101
#define TK_DOUBLE 102
#define TK_BINARY 103
#define TK_TIMESTAMP 104
#define TK_NCHAR 105
#define TK_UNSIGNED 106
#define TK_JSON 107
#define TK_VARCHAR 108
#define TK_MEDIUMBLOB 109
#define TK_BLOB 110
#define TK_VARBINARY 111
#define TK_DECIMAL 112
#define TK_MAX_DELAY 113
#define TK_WATERMARK 114
#define TK_ROLLUP 115
#define TK_TTL 116
#define TK_SMA 117
#define TK_FIRST 118
#define TK_LAST 119
#define TK_SHOW 120
#define TK_DATABASES 121
#define TK_TABLES 122
#define TK_STABLES 123
#define TK_MNODES 124
#define TK_MODULES 125
#define TK_QNODES 126
#define TK_FUNCTIONS 127
#define TK_INDEXES 128
#define TK_ACCOUNTS 129
#define TK_APPS 130
#define TK_CONNECTIONS 131
#define TK_LICENCE 132
#define TK_GRANTS 133
#define TK_QUERIES 134
#define TK_SCORES 135
#define TK_TOPICS 136
#define TK_VARIABLES 137
#define TK_BNODES 138
#define TK_SNODES 139
#define TK_CLUSTER 140
#define TK_TRANSACTIONS 141
#define TK_DISTRIBUTED 142
#define TK_CONSUMERS 143
#define TK_SUBSCRIPTIONS 144
#define TK_LIKE 145
#define TK_INDEX 146
#define TK_FUNCTION 147
#define TK_INTERVAL 148
#define TK_TOPIC 149
#define TK_AS 150
#define TK_WITH 151
#define TK_META 152
#define TK_CONSUMER 153
#define TK_GROUP 154
#define TK_DESC 155
#define TK_DESCRIBE 156
#define TK_RESET 157
#define TK_QUERY 158
#define TK_CACHE 159
#define TK_EXPLAIN 160
#define TK_ANALYZE 161
#define TK_VERBOSE 162
#define TK_NK_BOOL 163
#define TK_RATIO 164
#define TK_NK_FLOAT 165
#define TK_COMPACT 166
#define TK_VNODES 167
#define TK_IN 168
#define TK_OUTPUTTYPE 169
#define TK_AGGREGATE 170
#define TK_BUFSIZE 171
#define TK_STREAM 172
#define TK_INTO 173
#define TK_TRIGGER 174
#define TK_AT_ONCE 175
#define TK_WINDOW_CLOSE 176
#define TK_IGNORE 177
#define TK_EXPIRED 178
#define TK_KILL 179
#define TK_CONNECTION 180
#define TK_TRANSACTION 181
#define TK_BALANCE 182
#define TK_VGROUP 183
#define TK_MERGE 184
#define TK_REDISTRIBUTE 185
#define TK_SPLIT 186
#define TK_SYNCDB 187
#define TK_DELETE 188
#define TK_INSERT 189
#define TK_NULL 190
#define TK_NK_QUESTION 191
#define TK_NK_ARROW 192
......@@ -263,11 +263,10 @@
#define TK_NULLS 245
#define TK_ID 246
#define TK_NK_BITNOT 247
#define TK_INSERT 248
#define TK_VALUES 249
#define TK_IMPORT 250
#define TK_NK_SEMI 251
#define TK_FILE 252
#define TK_VALUES 248
#define TK_IMPORT 249
#define TK_NK_SEMI 250
#define TK_FILE 251
#define TK_NK_SPACE 300
#define TK_NK_COMMENT 301
......
......@@ -195,6 +195,7 @@ typedef enum ENodeType {
QUERY_NODE_KILL_QUERY_STMT,
QUERY_NODE_KILL_TRANSACTION_STMT,
QUERY_NODE_DELETE_STMT,
QUERY_NODE_INSERT_STMT,
QUERY_NODE_QUERY,
// logic plan node
......@@ -248,6 +249,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC,
QUERY_NODE_PHYSICAL_PLAN_DISPATCH,
QUERY_NODE_PHYSICAL_PLAN_INSERT,
QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT,
QUERY_NODE_PHYSICAL_PLAN_DELETE,
QUERY_NODE_PHYSICAL_SUBPLAN,
QUERY_NODE_PHYSICAL_PLAN
......
......@@ -131,6 +131,7 @@ typedef struct SVnodeModifyLogicNode {
int8_t tableType; // table type
char tableFName[TSDB_TABLE_FNAME_LEN];
STimeWindow deleteTimeRange;
SVgroupsInfo* pVgroupList;
} SVnodeModifyLogicNode;
typedef struct SExchangeLogicNode {
......@@ -456,6 +457,15 @@ typedef struct SDataInserterNode {
char* pData;
} SDataInserterNode;
typedef struct SQueryInserterNode {
SDataSinkNode sink;
uint64_t tableId;
int8_t tableType; // table type
char tableFName[TSDB_TABLE_FNAME_LEN];
int32_t vgId;
SEpSet epSet;
} SQueryInserterNode;
typedef struct SDataDeleterNode {
SDataSinkNode sink;
uint64_t tableId;
......
......@@ -302,6 +302,14 @@ typedef struct SDeleteStmt {
bool deleteZeroRows;
} SDeleteStmt;
typedef struct SInsertStmt {
ENodeType type; // QUERY_NODE_INSERT_STMT
SNode* pTable;
SNodeList* pCols;
SNode* pQuery;
uint8_t precision;
} SInsertStmt;
typedef enum {
PAYLOAD_TYPE_KV = 0,
PAYLOAD_TYPE_RAW = 1,
......
......@@ -56,7 +56,7 @@ typedef struct SParseContext {
} SParseContext;
int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery);
bool qIsInsertSql(const char* pStr, size_t length);
bool qIsInsertValuesSql(const char* pStr, size_t length);
// for async mode
int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq);
......
......@@ -396,8 +396,7 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
.requestId = pStmt->exec.pRequest->requestId,
.requestObjRefId = pStmt->exec.pRequest->self,
.mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
int32_t code =
catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta);
int32_t code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta);
if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
STMT_ERR_RET(stmtCleanBindInfo(pStmt));
......@@ -849,7 +848,7 @@ int stmtIsInsert(TAOS_STMT* stmt, int* insert) {
if (pStmt->sql.type) {
*insert = (STMT_TYPE_INSERT == pStmt->sql.type || STMT_TYPE_MULTI_INSERT == pStmt->sql.type);
} else {
*insert = qIsInsertSql(pStmt->sql.sqlStr, 0);
*insert = qIsInsertValuesSql(pStmt->sql.sqlStr, pStmt->sql.sqlLen);
}
return TSDB_CODE_SUCCESS;
......@@ -919,7 +918,6 @@ int stmtGetColFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
return TSDB_CODE_SUCCESS;
}
int stmtGetParamNum(TAOS_STMT* stmt, int* nums) {
STscStmt* pStmt = (STscStmt*)stmt;
......@@ -952,7 +950,7 @@ int stmtGetParamNum(TAOS_STMT* stmt, int* nums) {
return TSDB_CODE_SUCCESS;
}
int stmtGetParam(TAOS_STMT *stmt, int idx, int *type, int *bytes) {
int stmtGetParam(TAOS_STMT* stmt, int idx, int* type, int* bytes) {
STscStmt* pStmt = (STscStmt*)stmt;
if (STMT_TYPE_QUERY == pStmt->sql.type) {
......@@ -980,7 +978,7 @@ int stmtGetParam(TAOS_STMT *stmt, int idx, int *type, int *bytes) {
}
int32_t nums = 0;
TAOS_FIELD_E *pField = NULL;
TAOS_FIELD_E* pField = NULL;
STMT_ERR_RET(stmtFetchColFields(stmt, &nums, &pField));
if (idx >= nums) {
tscError("idx %d is too big", idx);
......
......@@ -165,7 +165,7 @@ static int32_t valueNodeCopy(const SValueNode* pSrc, SValueNode* pDst) {
memcpy(pDst->datum.p, pSrc->datum.p, len);
break;
}
case TSDB_DATA_TYPE_JSON:{
case TSDB_DATA_TYPE_JSON: {
int32_t len = getJsonValueLen(pSrc->datum.p);
pDst->datum.p = taosMemoryCalloc(1, len);
if (NULL == pDst->datum.p) {
......@@ -397,6 +397,7 @@ static int32_t logicVnodeModifCopy(const SVnodeModifyLogicNode* pSrc, SVnodeModi
COPY_SCALAR_FIELD(tableType);
COPY_CHAR_ARRAY_FIELD(tableFName);
COPY_OBJECT_FIELD(deleteTimeRange, sizeof(STimeWindow));
CLONE_OBJECT_FIELD(pVgroupList, vgroupsInfoClone);
return TSDB_CODE_SUCCESS;
}
......
......@@ -19,8 +19,8 @@
#include "query.h"
#include "querynodes.h"
#include "taoserror.h"
#include "tjson.h"
#include "tdatablock.h"
#include "tjson.h"
static int32_t nodeToJson(const void* pObj, SJson* pJson);
static int32_t jsonToNode(const SJson* pJson, void* pObj);
......@@ -179,6 +179,8 @@ const char* nodesNodeName(ENodeType type) {
return "ShowVnodeStmt";
case QUERY_NODE_DELETE_STMT:
return "DeleteStmt";
case QUERY_NODE_INSERT_STMT:
return "InsertStmt";
case QUERY_NODE_LOGIC_PLAN_SCAN:
return "LogicScan";
case QUERY_NODE_LOGIC_PLAN_JOIN:
......@@ -271,6 +273,8 @@ const char* nodesNodeName(ENodeType type) {
return "PhysiDispatch";
case QUERY_NODE_PHYSICAL_PLAN_INSERT:
return "PhysiInsert";
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT:
return "PhysiQueryInsert";
case QUERY_NODE_PHYSICAL_PLAN_DELETE:
return "PhysiDelete";
case QUERY_NODE_PHYSICAL_SUBPLAN:
......@@ -2210,6 +2214,58 @@ static int32_t physiDispatchNodeToJson(const void* pObj, SJson* pJson) { return
static int32_t jsonToPhysiDispatchNode(const SJson* pJson, void* pObj) { return jsonToPhysicDataSinkNode(pJson, pObj); }
static const char* jkQueryInsertPhysiPlanTableId = "TableId";
static const char* jkQueryInsertPhysiPlanTableType = "TableType";
static const char* jkQueryInsertPhysiPlanTableFName = "TableFName";
static const char* jkQueryInsertPhysiPlanVgId = "VgId";
static const char* jkQueryInsertPhysiPlanEpSet = "EpSet";
static int32_t physiQueryInsertNodeToJson(const void* pObj, SJson* pJson) {
const SQueryInserterNode* pNode = (const SQueryInserterNode*)pObj;
int32_t code = physicDataSinkNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkQueryInsertPhysiPlanTableId, pNode->tableId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkQueryInsertPhysiPlanTableType, pNode->tableType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddStringToObject(pJson, jkQueryInsertPhysiPlanTableFName, pNode->tableFName);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkQueryInsertPhysiPlanVgId, pNode->vgId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkQueryInsertPhysiPlanEpSet, epSetToJson, &pNode->epSet);
}
return code;
}
static int32_t jsonToPhysiQueryInsertNode(const SJson* pJson, void* pObj) {
SQueryInserterNode* pNode = (SQueryInserterNode*)pObj;
int32_t code = jsonToPhysicDataSinkNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetUBigIntValue(pJson, jkQueryInsertPhysiPlanTableId, &pNode->tableId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkQueryInsertPhysiPlanTableType, &pNode->tableType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetStringValue(pJson, jkQueryInsertPhysiPlanTableFName, pNode->tableFName);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkQueryInsertPhysiPlanVgId, &pNode->vgId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonToObject(pJson, jkQueryInsertPhysiPlanEpSet, jsonToEpSet, &pNode->epSet);
}
return code;
}
static const char* jkDeletePhysiPlanTableId = "TableId";
static const char* jkDeletePhysiPlanTableType = "TableType";
static const char* jkDeletePhysiPlanTableFName = "TableFName";
......@@ -2641,9 +2697,9 @@ static int32_t datumToJson(const void* pObj, SJson* pJson) {
case TSDB_DATA_TYPE_VARBINARY:
code = tjsonAddStringToObject(pJson, jkValueDatum, varDataVal(pNode->datum.p));
break;
case TSDB_DATA_TYPE_JSON:{
case TSDB_DATA_TYPE_JSON: {
int32_t len = getJsonValueLen(pNode->datum.p);
char* buf = taosMemoryCalloc( len * 2 + 1, sizeof(char));
char* buf = taosMemoryCalloc(len * 2 + 1, sizeof(char));
code = taosHexEncode(pNode->datum.p, buf, len);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(buf);
......@@ -2775,7 +2831,7 @@ static int32_t jsonToDatum(const SJson* pJson, void* pObj) {
}
break;
}
case TSDB_DATA_TYPE_JSON:{
case TSDB_DATA_TYPE_JSON: {
pNode->datum.p = taosMemoryCalloc(1, pNode->node.resType.bytes);
if (NULL == pNode->datum.p) {
code = TSDB_CODE_OUT_OF_MEMORY;
......@@ -4232,6 +4288,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return physiDispatchNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_INSERT:
break;
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT:
return physiQueryInsertNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_DELETE:
return physiDeleteNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_SUBPLAN:
......@@ -4374,6 +4432,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToPhysiInterpFuncNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
return jsonToPhysiDispatchNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT:
return jsonToPhysiQueryInsertNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_DELETE:
return jsonToPhysiDeleteNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_SUBPLAN:
......
......@@ -231,6 +231,8 @@ SNode* nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SKillStmt));
case QUERY_NODE_DELETE_STMT:
return makeNode(type, sizeof(SDeleteStmt));
case QUERY_NODE_INSERT_STMT:
return makeNode(type, sizeof(SInsertStmt));
case QUERY_NODE_QUERY:
return makeNode(type, sizeof(SQuery));
case QUERY_NODE_LOGIC_PLAN_SCAN:
......@@ -327,6 +329,8 @@ SNode* nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SDataDispatcherNode));
case QUERY_NODE_PHYSICAL_PLAN_INSERT:
return makeNode(type, sizeof(SDataInserterNode));
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT:
return makeNode(type, sizeof(SQueryInserterNode));
case QUERY_NODE_PHYSICAL_PLAN_DELETE:
return makeNode(type, sizeof(SDataDeleterNode));
case QUERY_NODE_PHYSICAL_SUBPLAN:
......@@ -694,6 +698,13 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode(pStmt->pTagCond);
break;
}
case QUERY_NODE_INSERT_STMT: {
SInsertStmt* pStmt = (SInsertStmt*)pNode;
nodesDestroyNode(pStmt->pTable);
nodesDestroyList(pStmt->pCols);
nodesDestroyNode(pStmt->pQuery);
break;
}
case QUERY_NODE_QUERY: {
SQuery* pQuery = (SQuery*)pNode;
nodesDestroyNode(pQuery->pRoot);
......@@ -929,6 +940,11 @@ void nodesDestroyNode(SNode* pNode) {
taosMemoryFreeClear(pSink->pData);
break;
}
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
SQueryInserterNode* pSink = (SQueryInserterNode*)pNode;
destroyDataSinkNode((SDataSinkNode*)pSink);
break;
}
case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
SDataDeleterNode* pSink = (SDataDeleterNode*)pNode;
destroyDataSinkNode((SDataSinkNode*)pSink);
......@@ -1528,7 +1544,6 @@ int32_t nodesCollectColumnsFromNode(SNode* node, const char* pTableAlias, EColle
}
return TSDB_CODE_SUCCESS;
}
typedef struct SCollectFuncsCxt {
......
......@@ -211,6 +211,7 @@ SNode* createSyncdbStmt(SAstCreateContext* pCxt, const SToken* pDbName);
SNode* createGrantStmt(SAstCreateContext* pCxt, int64_t privileges, SToken* pDbName, SToken* pUserName);
SNode* createRevokeStmt(SAstCreateContext* pCxt, int64_t privileges, SToken* pDbName, SToken* pUserName);
SNode* createDeleteStmt(SAstCreateContext* pCxt, SNode* pTable, SNode* pWhere);
SNode* createInsertStmt(SAstCreateContext* pCxt, SNode* pTable, SNodeList* pCols, SNode* pQuery);
#ifdef __cplusplus
}
......
......@@ -260,7 +260,7 @@ multi_create_clause(A) ::= multi_create_clause(B) create_subtable_clause(C).
create_subtable_clause(A) ::=
not_exists_opt(B) full_table_name(C) USING full_table_name(D)
specific_tags_opt(E) TAGS NK_LP expression_list(F) NK_RP table_options(G). { A = createCreateSubTableClause(pCxt, B, C, D, E, F, G); }
specific_cols_opt(E) TAGS NK_LP expression_list(F) NK_RP table_options(G). { A = createCreateSubTableClause(pCxt, B, C, D, E, F, G); }
%type multi_drop_clause { SNodeList* }
%destructor multi_drop_clause { nodesDestroyList($$); }
......@@ -269,10 +269,10 @@ multi_drop_clause(A) ::= multi_drop_clause(B) drop_table_clause(C).
drop_table_clause(A) ::= exists_opt(B) full_table_name(C). { A = createDropTableClause(pCxt, B, C); }
%type specific_tags_opt { SNodeList* }
%destructor specific_tags_opt { nodesDestroyList($$); }
specific_tags_opt(A) ::= . { A = NULL; }
specific_tags_opt(A) ::= NK_LP col_name_list(B) NK_RP. { A = B; }
%type specific_cols_opt { SNodeList* }
%destructor specific_cols_opt { nodesDestroyList($$); }
specific_cols_opt(A) ::= . { A = NULL; }
specific_cols_opt(A) ::= NK_LP col_name_list(B) NK_RP. { A = B; }
full_table_name(A) ::= table_name(B). { A = createRealTableNode(pCxt, NULL, &B, NULL); }
full_table_name(A) ::= db_name(B) NK_DOT table_name(C). { A = createRealTableNode(pCxt, &B, &C, NULL); }
......@@ -516,6 +516,9 @@ cmd ::= DELETE FROM full_table_name(A) where_clause_opt(B).
/************************************************ select **************************************************************/
cmd ::= query_expression(A). { pCxt->pRootNode = A; }
/************************************************ insert **************************************************************/
cmd ::= INSERT INTO full_table_name(A) specific_cols_opt(B) query_expression(C). { pCxt->pRootNode = createInsertStmt(pCxt, A, B, C); }
/************************************************ literal *************************************************************/
literal(A) ::= NK_INTEGER(B). { A = createRawExprNode(pCxt, &B, createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &B)); }
literal(A) ::= NK_FLOAT(B). { A = createRawExprNode(pCxt, &B, createValueNode(pCxt, TSDB_DATA_TYPE_DOUBLE, &B)); }
......@@ -974,4 +977,4 @@ null_ordering_opt(A) ::= .
null_ordering_opt(A) ::= NULLS FIRST. { A = NULL_ORDER_FIRST; }
null_ordering_opt(A) ::= NULLS LAST. { A = NULL_ORDER_LAST; }
%fallback ID NK_BITNOT INSERT VALUES IMPORT NK_SEMI FILE.
%fallback ID NK_BITNOT VALUES IMPORT NK_SEMI FILE.
......@@ -1673,3 +1673,13 @@ SNode* createDeleteStmt(SAstCreateContext* pCxt, SNode* pTable, SNode* pWhere) {
}
return (SNode*)pStmt;
}
SNode* createInsertStmt(SAstCreateContext* pCxt, SNode* pTable, SNodeList* pCols, SNode* pQuery) {
CHECK_PARSER_STATUS(pCxt);
SInsertStmt* pStmt = (SInsertStmt*)nodesMakeNode(QUERY_NODE_INSERT_STMT);
CHECK_OUT_OF_MEM(pStmt);
pStmt->pTable = pTable;
pStmt->pCols = pCols;
pStmt->pQuery = pQuery;
return (SNode*)pStmt;
}
......@@ -451,6 +451,14 @@ static int32_t collectMetaKeyFromDelete(SCollectMetaKeyCxt* pCxt, SDeleteStmt* p
return collectMetaKeyFromRealTableImpl(pCxt, (SRealTableNode*)pStmt->pFromTable, AUTH_TYPE_WRITE);
}
static int32_t collectMetaKeyFromInsert(SCollectMetaKeyCxt* pCxt, SInsertStmt* pStmt) {
int32_t code = collectMetaKeyFromRealTableImpl(pCxt, (SRealTableNode*)pStmt->pTable, AUTH_TYPE_WRITE);
if (TSDB_CODE_SUCCESS == code) {
code = collectMetaKeyFromQuery(pCxt, pStmt->pQuery);
}
return code;
}
static int32_t collectMetaKeyFromShowBlockDist(SCollectMetaKeyCxt* pCxt, SShowTableDistributedStmt* pStmt) {
SName name = {.type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId};
strcpy(name.dbname, pStmt->dbName);
......@@ -560,6 +568,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
return collectMetaKeyFromShowTransactions(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_DELETE_STMT:
return collectMetaKeyFromDelete(pCxt, (SDeleteStmt*)pStmt);
case QUERY_NODE_INSERT_STMT:
return collectMetaKeyFromInsert(pCxt, (SInsertStmt*)pStmt);
case QUERY_NODE_SHOW_TABLE_DISTRIBUTED_STMT:
return collectMetaKeyFromShowBlockDist(pCxt, (SShowTableDistributedStmt*)pStmt);
case QUERY_NODE_SHOW_SUBSCRIPTIONS_STMT:
......
......@@ -88,6 +88,14 @@ static int32_t authDelete(SAuthCxt* pCxt, SDeleteStmt* pDelete) {
return checkAuth(pCxt, ((SRealTableNode*)pDelete->pFromTable)->table.dbName, AUTH_TYPE_WRITE);
}
static int32_t authInsert(SAuthCxt* pCxt, SInsertStmt* pInsert) {
int32_t code = checkAuth(pCxt, ((SRealTableNode*)pInsert->pTable)->table.dbName, AUTH_TYPE_WRITE);
if (TSDB_CODE_SUCCESS == code) {
code = authQuery(pCxt, pInsert->pQuery);
}
return code;
}
static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt) {
switch (nodeType(pStmt)) {
case QUERY_NODE_SET_OPERATOR:
......@@ -98,6 +106,8 @@ static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt) {
return authDropUser(pCxt, (SDropUserStmt*)pStmt);
case QUERY_NODE_DELETE_STMT:
return authDelete(pCxt, (SDeleteStmt*)pStmt);
case QUERY_NODE_INSERT_STMT:
return authInsert(pCxt, (SInsertStmt*)pStmt);
default:
break;
}
......
......@@ -300,6 +300,14 @@ static int32_t calcConstDelete(SCalcConstContext* pCxt, SDeleteStmt* pDelete) {
return code;
}
static int32_t calcConstInsert(SCalcConstContext* pCxt, SInsertStmt* pInsert) {
int32_t code = calcConstFromTable(pCxt, pInsert->pTable);
if (TSDB_CODE_SUCCESS == code) {
code = calcConstQuery(pCxt, pInsert->pQuery, false);
}
return code;
}
static int32_t calcConstQuery(SCalcConstContext* pCxt, SNode* pStmt, bool subquery) {
int32_t code = TSDB_CODE_SUCCESS;
switch (nodeType(pStmt)) {
......@@ -320,6 +328,9 @@ static int32_t calcConstQuery(SCalcConstContext* pCxt, SNode* pStmt, bool subque
case QUERY_NODE_DELETE_STMT:
code = calcConstDelete(pCxt, (SDeleteStmt*)pStmt);
break;
case QUERY_NODE_INSERT_STMT:
code = calcConstInsert(pCxt, (SInsertStmt*)pStmt);
break;
default:
break;
}
......
......@@ -110,7 +110,7 @@ typedef struct SMemParam {
static int32_t skipInsertInto(char** pSql, SMsgBuf* pMsg) {
SToken sToken;
NEXT_TOKEN(*pSql, sToken);
if (TK_INSERT != sToken.type) {
if (TK_INSERT != sToken.type && TK_IMPORT != sToken.type) {
return buildSyntaxErrMsg(pMsg, "keyword INSERT is expected", sToken.z);
}
NEXT_TOKEN(*pSql, sToken);
......
......@@ -2839,6 +2839,21 @@ static int32_t translateDelete(STranslateContext* pCxt, SDeleteStmt* pDelete) {
return code;
}
static int32_t translateInsert(STranslateContext* pCxt, SInsertStmt* pInsert) {
pCxt->pCurrStmt = (SNode*)pInsert;
int32_t code = translateFrom(pCxt, pInsert->pTable);
if (TSDB_CODE_SUCCESS == code) {
code = translateExprList(pCxt, pInsert->pCols);
}
if (TSDB_CODE_SUCCESS == code) {
code = resetTranslateNamespace(pCxt);
}
if (TSDB_CODE_SUCCESS == code) {
code = translateQuery(pCxt, pInsert->pQuery);
}
return code;
}
static int64_t getUnitPerMinute(uint8_t precision) {
switch (precision) {
case TSDB_TIME_PRECISION_MILLI:
......@@ -4608,6 +4623,9 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
case QUERY_NODE_DELETE_STMT:
code = translateDelete(pCxt, (SDeleteStmt*)pNode);
break;
case QUERY_NODE_INSERT_STMT:
code = translateInsert(pCxt, (SInsertStmt*)pNode);
break;
case QUERY_NODE_CREATE_DATABASE_STMT:
code = translateCreateDatabase(pCxt, (SCreateDatabaseStmt*)pNode);
break;
......@@ -6288,6 +6306,10 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
pQuery->msgType = TDMT_VND_DELETE;
break;
case QUERY_NODE_INSERT_STMT:
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
pQuery->msgType = TDMT_VND_SUBMIT;
break;
case QUERY_NODE_VNODE_MODIF_STMT:
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
pQuery->msgType = toMsgType(((SVnodeModifOpStmt*)pQuery->pRoot)->sqlNodeType);
......
......@@ -19,19 +19,28 @@
#include "parInt.h"
#include "parToken.h"
bool qIsInsertSql(const char* pStr, size_t length) {
bool qIsInsertValuesSql(const char* pStr, size_t length) {
if (NULL == pStr) {
return false;
}
const char* pSql = pStr;
int32_t index = 0;
SToken t = tStrGetToken((char*)pStr, &index, false);
if (TK_INSERT != t.type && TK_IMPORT != t.type) {
return false;
}
do {
SToken t0 = tStrGetToken((char*)pStr, &index, false);
if (t0.type != TK_NK_LP) {
return t0.type == TK_INSERT || t0.type == TK_IMPORT;
pStr += index;
index = 0;
t = tStrGetToken((char*)pStr, &index, false);
if (TK_USING == t.type || TK_VALUES == t.type) {
return true;
}
} while (1);
} while (pStr - pSql < length);
return false;
}
static int32_t analyseSemantic(SParseContext* pCxt, SQuery* pQuery, SParseMetaCache* pMetaCache) {
......@@ -148,7 +157,7 @@ static void rewriteExprAlias(SNode* pRoot) {
int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery) {
int32_t code = TSDB_CODE_SUCCESS;
if (qIsInsertSql(pCxt->pSql, pCxt->sqlLen)) {
if (qIsInsertValuesSql(pCxt->pSql, pCxt->sqlLen)) {
code = parseInsertSql(pCxt, pQuery, NULL);
} else {
code = parseSqlIntoAst(pCxt, pQuery);
......@@ -160,7 +169,7 @@ int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery) {
int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq) {
SParseMetaCache metaCache = {0};
int32_t code = TSDB_CODE_SUCCESS;
if (qIsInsertSql(pCxt->pSql, pCxt->sqlLen)) {
if (qIsInsertValuesSql(pCxt->pSql, pCxt->sqlLen)) {
code = parseInsertSyntax(pCxt, pQuery, &metaCache);
} else {
code = parseSqlSyntax(pCxt, pQuery, &metaCache);
......
因为 它太大了无法显示 source diff 。你可以改为 查看blob
......@@ -40,6 +40,12 @@ TEST_F(ParserExplainToSyncdbTest, grant) {
run("GRANT READ, WRITE ON test.* TO wxy");
}
TEST_F(ParserExplainToSyncdbTest, insert) {
useDb("root", "test");
run("INSERT INTO t1 SELECT * FROM t1");
}
// todo kill connection
// todo kill query
// todo kill stream
......
......@@ -26,6 +26,7 @@ typedef int32_t (*FCreateLogicNode)(SLogicPlanContext*, void*, SLogicNode**);
typedef int32_t (*FCreateSelectLogicNode)(SLogicPlanContext*, SSelectStmt*, SLogicNode**);
typedef int32_t (*FCreateSetOpLogicNode)(SLogicPlanContext*, SSetOperator*, SLogicNode**);
typedef int32_t (*FCreateDeleteLogicNode)(SLogicPlanContext*, SDeleteStmt*, SLogicNode**);
typedef int32_t (*FCreateInsertLogicNode)(SLogicPlanContext*, SInsertStmt*, SLogicNode**);
static int32_t doCreateLogicNodeByTable(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SNode* pTable,
SLogicNode** pLogicNode);
......@@ -1264,6 +1265,47 @@ static int32_t createDeleteLogicNode(SLogicPlanContext* pCxt, SDeleteStmt* pDele
return code;
}
static int32_t creatInsertRootLogicNode(SLogicPlanContext* pCxt, SInsertStmt* pInsert, FCreateInsertLogicNode func,
SLogicNode** pRoot) {
return createRootLogicNode(pCxt, pInsert, pInsert->precision, (FCreateLogicNode)func, pRoot);
}
static int32_t createVnodeModifLogicNodeByInsert(SLogicPlanContext* pCxt, SInsertStmt* pInsert,
SLogicNode** pLogicNode) {
SVnodeModifyLogicNode* pModify = (SVnodeModifyLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY);
if (NULL == pModify) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SRealTableNode* pRealTable = (SRealTableNode*)pInsert->pTable;
pModify->modifyType = MODIFY_TABLE_TYPE_INSERT;
pModify->tableId = pRealTable->pMeta->uid;
pModify->tableType = pRealTable->pMeta->tableType;
snprintf(pModify->tableFName, sizeof(pModify->tableFName), "%d.%s.%s", pCxt->pPlanCxt->acctId,
pRealTable->table.dbName, pRealTable->table.tableName);
TSWAP(pModify->pVgroupList, pRealTable->pVgroupList);
*pLogicNode = (SLogicNode*)pModify;
return TSDB_CODE_SUCCESS;
}
static int32_t createInsertLogicNode(SLogicPlanContext* pCxt, SInsertStmt* pInsert, SLogicNode** pLogicNode) {
SLogicNode* pRoot = NULL;
int32_t code = createQueryLogicNode(pCxt, pInsert->pQuery, &pRoot);
if (TSDB_CODE_SUCCESS == code) {
code = creatInsertRootLogicNode(pCxt, pInsert, createVnodeModifLogicNodeByInsert, &pRoot);
}
if (TSDB_CODE_SUCCESS == code) {
*pLogicNode = pRoot;
} else {
nodesDestroyNode((SNode*)pRoot);
}
return code;
}
static int32_t createQueryLogicNode(SLogicPlanContext* pCxt, SNode* pStmt, SLogicNode** pLogicNode) {
switch (nodeType(pStmt)) {
case QUERY_NODE_SELECT_STMT:
......@@ -1276,6 +1318,8 @@ static int32_t createQueryLogicNode(SLogicPlanContext* pCxt, SNode* pStmt, SLogi
return createSetOperatorLogicNode(pCxt, (SSetOperator*)pStmt, pLogicNode);
case QUERY_NODE_DELETE_STMT:
return createDeleteLogicNode(pCxt, (SDeleteStmt*)pStmt, pLogicNode);
case QUERY_NODE_INSERT_STMT:
return createInsertLogicNode(pCxt, (SInsertStmt*)pStmt, pLogicNode);
default:
break;
}
......
......@@ -485,7 +485,7 @@ static int32_t pushDownCondOptPushCondToProject(SOptimizeContext* pCxt, SProject
return pushDownCondOptAppendCond(&pProject->node.pConditions, pCond);
}
static int32_t pushDownCondOptPushCondToJoin(SOptimizeContext* pCxt, SJoinLogicNode * pJoin, SNode** pCond) {
static int32_t pushDownCondOptPushCondToJoin(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SNode** pCond) {
return pushDownCondOptAppendCond(&pJoin->node.pConditions, pCond);
}
......@@ -821,7 +821,7 @@ static int32_t pushDownCondOptDealAgg(SOptimizeContext* pCxt, SAggLogicNode* pAg
typedef struct SRewriteProjCondContext {
SProjectLogicNode* pProj;
int32_t errCode;
}SRewriteProjCondContext;
} SRewriteProjCondContext;
static EDealRes rewriteProjectCondForPushDownImpl(SNode** ppNode, void* pContext) {
SRewriteProjCondContext* pCxt = pContext;
......@@ -849,7 +849,8 @@ static EDealRes rewriteProjectCondForPushDownImpl(SNode** ppNode, void* pContext
return DEAL_RES_CONTINUE;
}
static int32_t rewriteProjectCondForPushDown(SOptimizeContext* pCxt, SProjectLogicNode* pProject, SNode** ppProjectCond) {
static int32_t rewriteProjectCondForPushDown(SOptimizeContext* pCxt, SProjectLogicNode* pProject,
SNode** ppProjectCond) {
SRewriteProjCondContext cxt = {.pProj = pProject, .errCode = TSDB_CODE_SUCCESS};
SNode* pProjectCond = pProject->node.pConditions;
nodesRewriteExpr(&pProjectCond, rewriteProjectCondForPushDownImpl, &cxt);
......@@ -2082,13 +2083,18 @@ static const int32_t optimizeRuleNum = (sizeof(optimizeRuleSet) / sizeof(SOptimi
static void dumpLogicSubplan(const char* pRuleName, SLogicSubplan* pSubplan) {
char* pStr = NULL;
nodesNodeToString((SNode*)pSubplan, false, &pStr, NULL);
if (NULL == pRuleName) {
qDebugL("before optimize: %s", pStr);
} else {
qDebugL("apply optimize %s rule: %s", pRuleName, pStr);
}
taosMemoryFree(pStr);
}
static int32_t applyOptimizeRule(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) {
SOptimizeContext cxt = {.pPlanCxt = pCxt, .optimized = false};
bool optimized = false;
dumpLogicSubplan(NULL, pLogicSubplan);
do {
optimized = false;
for (int32_t i = 0; i < optimizeRuleNum; ++i) {
......
......@@ -632,8 +632,8 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
}
if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pOnConditions) {
code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1, pJoinLogicNode->pOnConditions,
&pJoin->pOnConditions);
code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1,
pJoinLogicNode->pOnConditions, &pJoin->pOnConditions);
}
if (TSDB_CODE_SUCCESS == code) {
......@@ -1496,12 +1496,60 @@ static SSubplan* makeSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubpl
return pSubplan;
}
static int32_t buildInsertSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
static int32_t buildInsertValuesSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
pSubplan->msgType = pModify->msgType;
pSubplan->execNode.epSet = pModify->pVgDataBlocks->vg.epSet;
return createDataInserter(pCxt, pModify->pVgDataBlocks, &pSubplan->pDataSink);
}
static int32_t createQueryInserter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan,
SDataSinkNode** pSink) {
SQueryInserterNode* pInserter = (SQueryInserterNode*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT);
if (NULL == pInserter) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pInserter->tableId = pModify->tableId;
pInserter->tableType = pModify->tableType;
strcpy(pInserter->tableFName, pModify->tableFName);
pInserter->vgId = pModify->pVgroupList->vgroups[0].vgId;
pInserter->epSet = pModify->pVgroupList->vgroups[0].epSet;
vgroupInfoToNodeAddr(pModify->pVgroupList->vgroups, &pSubplan->execNode);
int32_t code = TSDB_CODE_SUCCESS;
pInserter->sink.pInputDataBlockDesc =
(SDataBlockDescNode*)nodesCloneNode((SNode*)pSubplan->pNode->pOutputDataBlockDesc);
if (NULL == pInserter->sink.pInputDataBlockDesc) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
if (TSDB_CODE_SUCCESS == code) {
*pSink = (SDataSinkNode*)pInserter;
} else {
nodesDestroyNode((SNode*)pInserter);
}
return TSDB_CODE_SUCCESS;
}
static int32_t buildInsertSelectSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
int32_t code =
createPhysiNode(pCxt, (SLogicNode*)nodesListGetNode(pModify->node.pChildren, 0), pSubplan, &pSubplan->pNode);
if (TSDB_CODE_SUCCESS == code) {
code = createQueryInserter(pCxt, pModify, pSubplan, &pSubplan->pDataSink);
}
pSubplan->msgType = TDMT_VND_SUBMIT;
return code;
}
static int32_t buildInsertSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, SSubplan* pSubplan) {
if (NULL == pModify->node.pChildren) {
return buildInsertValuesSubplan(pCxt, pModify, pSubplan);
}
return buildInsertSelectSubplan(pCxt, pModify, pSubplan);
}
static int32_t createDataDeleter(SPhysiPlanContext* pCxt, SVnodeModifyLogicNode* pModify, const SPhysiNode* pRoot,
SDataSinkNode** pSink) {
SDataDeleterNode* pDeleter = (SDataDeleterNode*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_DELETE);
......
......@@ -82,29 +82,41 @@ static int32_t scaleOutByVgroups(SScaleOutContext* pCxt, SLogicSubplan* pSubplan
return code;
}
static int32_t scaleOutForModify(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
static int32_t scaleOutForMerge(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
return nodesListStrictAppend(pGroup, (SNode*)singleCloneSubLogicPlan(pCxt, pSubplan, level));
}
static int32_t scaleOutForInsertValues(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level,
SNodeList* pGroup) {
SVnodeModifyLogicNode* pNode = (SVnodeModifyLogicNode*)pSubplan->pNode;
if (MODIFY_TABLE_TYPE_DELETE == pNode->modifyType) {
return scaleOutByVgroups(pCxt, pSubplan, level, pGroup);
} else {
size_t numOfVgroups = taosArrayGetSize(pNode->pDataBlocks);
for (int32_t i = 0; i < numOfVgroups; ++i) {
SLogicSubplan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level);
if (NULL == pNewSubplan) {
return TSDB_CODE_OUT_OF_MEMORY;
}
((SVnodeModifyLogicNode*)pNewSubplan->pNode)->pVgDataBlocks =
(SVgDataBlocks*)taosArrayGetP(pNode->pDataBlocks, i);
((SVnodeModifyLogicNode*)pNewSubplan->pNode)->pVgDataBlocks = (SVgDataBlocks*)taosArrayGetP(pNode->pDataBlocks, i);
if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pGroup, (SNode*)pNewSubplan)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t scaleOutForInsert(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
SVnodeModifyLogicNode* pNode = (SVnodeModifyLogicNode*)pSubplan->pNode;
if (NULL == pNode->node.pChildren) {
return scaleOutForInsertValues(pCxt, pSubplan, level, pGroup);
}
return scaleOutForMerge(pCxt, pSubplan, level, pGroup);
}
static int32_t scaleOutForMerge(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
return nodesListStrictAppend(pGroup, (SNode*)singleCloneSubLogicPlan(pCxt, pSubplan, level));
static int32_t scaleOutForModify(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
SVnodeModifyLogicNode* pNode = (SVnodeModifyLogicNode*)pSubplan->pNode;
if (MODIFY_TABLE_TYPE_DELETE == pNode->modifyType) {
return scaleOutByVgroups(pCxt, pSubplan, level, pGroup);
}
return scaleOutForInsert(pCxt, pSubplan, level, pGroup);
}
static int32_t scaleOutForScan(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
......
......@@ -20,6 +20,7 @@
#define SPLIT_FLAG_MASK(n) (1 << n)
#define SPLIT_FLAG_STABLE_SPLIT SPLIT_FLAG_MASK(0)
#define SPLIT_FLAG_INSERT_SPLIT SPLIT_FLAG_MASK(1)
#define SPLIT_FLAG_SET_MASK(val, mask) (val) |= (mask)
#define SPLIT_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0)
......@@ -1196,6 +1197,41 @@ static int32_t smaIndexSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
return code;
}
typedef struct SInsertSelectSplitInfo {
SLogicNode* pQueryRoot;
SLogicSubplan* pSubplan;
} SInsertSelectSplitInfo;
static bool insSelSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
SInsertSelectSplitInfo* pInfo) {
if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY == nodeType(pNode) && 1 == LIST_LENGTH(pNode->pChildren) &&
MODIFY_TABLE_TYPE_INSERT == ((SVnodeModifyLogicNode*)pNode)->modifyType) {
pInfo->pQueryRoot = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0);
pInfo->pSubplan = pSubplan;
return true;
}
return false;
}
static int32_t insertSelectSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
SInsertSelectSplitInfo info = {0};
if (!splMatch(pCxt, pSubplan, SPLIT_FLAG_INSERT_SPLIT, (FSplFindSplitNode)insSelSplFindSplitNode, &info)) {
return TSDB_CODE_SUCCESS;
}
int32_t code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pQueryRoot, info.pSubplan->subplanType);
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, info.pQueryRoot, 0));
}
if (TSDB_CODE_SUCCESS == code) {
info.pSubplan->subplanType = SUBPLAN_TYPE_MODIFY;
SPLIT_FLAG_SET_MASK(info.pSubplan->splitFlag, SPLIT_FLAG_INSERT_SPLIT);
}
++(pCxt->groupId);
pCxt->split = true;
return code;
}
typedef struct SQnodeSplitInfo {
SLogicNode* pSplitNode;
SLogicSubplan* pSubplan;
......@@ -1249,7 +1285,8 @@ static const SSplitRule splitRuleSet[] = {
{.pName = "SingleTableJoinSplit", .splitFunc = singleTableJoinSplit},
{.pName = "UnionAllSplit", .splitFunc = unionAllSplit},
{.pName = "UnionDistinctSplit", .splitFunc = unionDistinctSplit},
{.pName = "SmaIndexSplit", .splitFunc = smaIndexSplit}
{.pName = "SmaIndexSplit", .splitFunc = smaIndexSplit},
{.pName = "InsertSelectSplit", .splitFunc = insertSelectSplit}
};
// clang-format on
......@@ -1258,7 +1295,11 @@ static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule));
static void dumpLogicSubplan(const char* pRuleName, SLogicSubplan* pSubplan) {
char* pStr = NULL;
nodesNodeToString((SNode*)pSubplan, false, &pStr, NULL);
if (NULL == pRuleName) {
qDebugL("before split: %s", pStr);
} else {
qDebugL("apply split %s rule: %s", pRuleName, pStr);
}
taosMemoryFree(pStr);
}
......@@ -1266,6 +1307,7 @@ static int32_t applySplitRule(SPlanContext* pCxt, SLogicSubplan* pSubplan) {
SSplitContext cxt = {
.pPlanCxt = pCxt, .queryId = pSubplan->id.queryId, .groupId = pSubplan->id.groupId + 1, .split = false};
bool split = false;
dumpLogicSubplan(NULL, pSubplan);
do {
split = false;
for (int32_t i = 0; i < splitRuleNum; ++i) {
......@@ -1293,8 +1335,16 @@ static void setVgroupsInfo(SLogicNode* pNode, SLogicSubplan* pSubplan) {
FOREACH(pChild, pNode->pChildren) { setVgroupsInfo((SLogicNode*)pChild, pSubplan); }
}
static bool needSplitSubplan(SLogicSubplan* pLogicSubplan) {
if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY != nodeType(pLogicSubplan->pNode)) {
return true;
}
SVnodeModifyLogicNode* pModify = (SVnodeModifyLogicNode*)pLogicSubplan->pNode;
return (MODIFY_TABLE_TYPE_INSERT == pModify->modifyType && NULL != pModify->node.pChildren);
}
int32_t splitLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) {
if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY == nodeType(pLogicSubplan->pNode)) {
if (!needSplitSubplan(pLogicSubplan)) {
setVgroupsInfo(pLogicSubplan->pNode, pLogicSubplan);
return TSDB_CODE_SUCCESS;
}
......
......@@ -91,3 +91,9 @@ TEST_F(PlanOtherTest, delete) {
run("DELETE FROM st1 WHERE ts > now - 2d and ts < now - 1d AND tag1 = 10");
}
TEST_F(PlanOtherTest, insert) {
useDb("root", "test");
run("INSERT INTO t1 SELECT * FROM t1");
}
......@@ -40,6 +40,9 @@ still_reachable=`cat ${LOG_DIR}/valgrind-taosd-*.log | grep "still reachable in"
definitely_lost=`cat ${LOG_DIR}/valgrind-taosd-*.log | grep "definitely lost in" | wc -l`
indirectly_lost=`cat ${LOG_DIR}/valgrind-taosd-*.log | grep "indirectly lost in " | wc -l`
possibly_lost=`cat ${LOG_DIR}/valgrind-taosd-*.log | grep "possibly lost in " | wc -l`
invalid_read=`cat ${LOG_DIR}/valgrind-taosd-*.log | grep "Invalid read of " | wc -l`
invalid_write=`cat ${LOG_DIR}/valgrind-taosd-*.log | grep "Invalid write of " | wc -l`
invalid_free=`cat ${LOG_DIR}/valgrind-taosd-*.log | grep "Invalid free() " | wc -l`
if [ $DETAIL -eq 1 ]; then
echo error_summary: $error_summary
......@@ -47,7 +50,10 @@ if [ $DETAIL -eq 1 ]; then
echo definitely_lost: $definitely_lost
echo indirectly_lost: $indirectly_lost
echo possibly_lost: $possibly_lost
echo invalid_read: $invalid_read
echo invalid_write: $invalid_write
echo invalid_free: $invalid_free
fi
let "errors=$still_reachable+$error_summary+$definitely_lost+$indirectly_lost+$possibly_lost"
let "errors=$error_summary+$still_reachable+$definitely_lost+$indirectly_lost+$possibly_lost+$invalid_read+$invalid_write+$invalid_free"
echo $errors
......@@ -96,7 +96,7 @@ class TDTestCase:
ff float, dd double, bb binary(100), nn nchar(100), tt timestamp)",
)
# conn.load_table_info("log")
tdLog.debug("statement start")
start = datetime.now()
stmt = conn.statement("insert into stb1 values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
......@@ -118,8 +118,11 @@ class TDTestCase:
params[14].nchar(["涛思数据", None, "a long string with 中文字符"])
params[15].timestamp([None, None, 1626861392591])
# print(type(stmt))
tdLog.debug("bind_param_batch start")
stmt.bind_param_batch(params)
tdLog.debug("bind_param_batch end")
stmt.execute()
tdLog.debug("execute end")
end = datetime.now()
print("elapsed time: ", end - start)
assert stmt.affected_rows == 3
......@@ -155,7 +158,7 @@ class TDTestCase:
print(rows1)
assert str(rows1[0][0]) == "2021-07-21 17:56:32.589000"
assert rows1[0][10] == 3
tdLog.debug("close start")
stmt.close()
......
......@@ -172,7 +172,7 @@ class TDTestCase:
tmqCom.initConsumerTable()
tdLog.info("create topics from stb with filter")
queryString = "select ts, acos(c1), ceil(pow(c1,3)) from %s.%s where (sin(c2) >= 0) and (c1 %% 4 == 0) and (ts >= %d) and (t4 like 'shanghai')"%(paraDict['dbName'], paraDict['stbName'], paraDict["startTs"]+math.ceil(self.rowsPerTbl/5))
queryString = "select ts, acos(c1), ceil(pow(c1,3)) from %s.%s where (sin(c2) >= 0) and (c1 %% 4 != 0) and (ts+1a >= %d) and (t4 like '%%shanghai')"%(paraDict['dbName'], paraDict['stbName'], paraDict["startTs"]+math.ceil(self.rowsPerTbl/10))
# queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册