提交 64460a95 编写于 作者: H Hongze Cheng

Merge branch 'feat/tsdb_refact' of https://github.com/taosdata/TDengine into feat/tsdb_refact

...@@ -73,194 +73,195 @@ ...@@ -73,194 +73,195 @@
#define TK_MNODE 55 #define TK_MNODE 55
#define TK_DATABASE 56 #define TK_DATABASE 56
#define TK_USE 57 #define TK_USE 57
#define TK_IF 58 #define TK_FLUSH 58
#define TK_NOT 59 #define TK_IF 59
#define TK_EXISTS 60 #define TK_NOT 60
#define TK_BUFFER 61 #define TK_EXISTS 61
#define TK_CACHELAST 62 #define TK_BUFFER 62
#define TK_COMP 63 #define TK_CACHELAST 63
#define TK_DURATION 64 #define TK_COMP 64
#define TK_NK_VARIABLE 65 #define TK_DURATION 65
#define TK_FSYNC 66 #define TK_NK_VARIABLE 66
#define TK_MAXROWS 67 #define TK_FSYNC 67
#define TK_MINROWS 68 #define TK_MAXROWS 68
#define TK_KEEP 69 #define TK_MINROWS 69
#define TK_PAGES 70 #define TK_KEEP 70
#define TK_PAGESIZE 71 #define TK_PAGES 71
#define TK_PRECISION 72 #define TK_PAGESIZE 72
#define TK_REPLICA 73 #define TK_PRECISION 73
#define TK_STRICT 74 #define TK_REPLICA 74
#define TK_WAL 75 #define TK_STRICT 75
#define TK_VGROUPS 76 #define TK_WAL 76
#define TK_SINGLE_STABLE 77 #define TK_VGROUPS 77
#define TK_RETENTIONS 78 #define TK_SINGLE_STABLE 78
#define TK_SCHEMALESS 79 #define TK_RETENTIONS 79
#define TK_NK_COLON 80 #define TK_SCHEMALESS 80
#define TK_TABLE 81 #define TK_NK_COLON 81
#define TK_NK_LP 82 #define TK_TABLE 82
#define TK_NK_RP 83 #define TK_NK_LP 83
#define TK_STABLE 84 #define TK_NK_RP 84
#define TK_ADD 85 #define TK_STABLE 85
#define TK_COLUMN 86 #define TK_ADD 86
#define TK_MODIFY 87 #define TK_COLUMN 87
#define TK_RENAME 88 #define TK_MODIFY 88
#define TK_TAG 89 #define TK_RENAME 89
#define TK_SET 90 #define TK_TAG 90
#define TK_NK_EQ 91 #define TK_SET 91
#define TK_USING 92 #define TK_NK_EQ 92
#define TK_TAGS 93 #define TK_USING 93
#define TK_COMMENT 94 #define TK_TAGS 94
#define TK_BOOL 95 #define TK_COMMENT 95
#define TK_TINYINT 96 #define TK_BOOL 96
#define TK_SMALLINT 97 #define TK_TINYINT 97
#define TK_INT 98 #define TK_SMALLINT 98
#define TK_INTEGER 99 #define TK_INT 99
#define TK_BIGINT 100 #define TK_INTEGER 100
#define TK_FLOAT 101 #define TK_BIGINT 101
#define TK_DOUBLE 102 #define TK_FLOAT 102
#define TK_BINARY 103 #define TK_DOUBLE 103
#define TK_TIMESTAMP 104 #define TK_BINARY 104
#define TK_NCHAR 105 #define TK_TIMESTAMP 105
#define TK_UNSIGNED 106 #define TK_NCHAR 106
#define TK_JSON 107 #define TK_UNSIGNED 107
#define TK_VARCHAR 108 #define TK_JSON 108
#define TK_MEDIUMBLOB 109 #define TK_VARCHAR 109
#define TK_BLOB 110 #define TK_MEDIUMBLOB 110
#define TK_VARBINARY 111 #define TK_BLOB 111
#define TK_DECIMAL 112 #define TK_VARBINARY 112
#define TK_MAX_DELAY 113 #define TK_DECIMAL 113
#define TK_WATERMARK 114 #define TK_MAX_DELAY 114
#define TK_ROLLUP 115 #define TK_WATERMARK 115
#define TK_TTL 116 #define TK_ROLLUP 116
#define TK_SMA 117 #define TK_TTL 117
#define TK_FIRST 118 #define TK_SMA 118
#define TK_LAST 119 #define TK_FIRST 119
#define TK_SHOW 120 #define TK_LAST 120
#define TK_DATABASES 121 #define TK_SHOW 121
#define TK_TABLES 122 #define TK_DATABASES 122
#define TK_STABLES 123 #define TK_TABLES 123
#define TK_MNODES 124 #define TK_STABLES 124
#define TK_MODULES 125 #define TK_MNODES 125
#define TK_QNODES 126 #define TK_MODULES 126
#define TK_FUNCTIONS 127 #define TK_QNODES 127
#define TK_INDEXES 128 #define TK_FUNCTIONS 128
#define TK_ACCOUNTS 129 #define TK_INDEXES 129
#define TK_APPS 130 #define TK_ACCOUNTS 130
#define TK_CONNECTIONS 131 #define TK_APPS 131
#define TK_LICENCE 132 #define TK_CONNECTIONS 132
#define TK_GRANTS 133 #define TK_LICENCE 133
#define TK_QUERIES 134 #define TK_GRANTS 134
#define TK_SCORES 135 #define TK_QUERIES 135
#define TK_TOPICS 136 #define TK_SCORES 136
#define TK_VARIABLES 137 #define TK_TOPICS 137
#define TK_BNODES 138 #define TK_VARIABLES 138
#define TK_SNODES 139 #define TK_BNODES 139
#define TK_CLUSTER 140 #define TK_SNODES 140
#define TK_TRANSACTIONS 141 #define TK_CLUSTER 141
#define TK_DISTRIBUTED 142 #define TK_TRANSACTIONS 142
#define TK_CONSUMERS 143 #define TK_DISTRIBUTED 143
#define TK_SUBSCRIPTIONS 144 #define TK_CONSUMERS 144
#define TK_LIKE 145 #define TK_SUBSCRIPTIONS 145
#define TK_INDEX 146 #define TK_LIKE 146
#define TK_FUNCTION 147 #define TK_INDEX 147
#define TK_INTERVAL 148 #define TK_FUNCTION 148
#define TK_TOPIC 149 #define TK_INTERVAL 149
#define TK_AS 150 #define TK_TOPIC 150
#define TK_WITH 151 #define TK_AS 151
#define TK_META 152 #define TK_WITH 152
#define TK_CONSUMER 153 #define TK_META 153
#define TK_GROUP 154 #define TK_CONSUMER 154
#define TK_DESC 155 #define TK_GROUP 155
#define TK_DESCRIBE 156 #define TK_DESC 156
#define TK_RESET 157 #define TK_DESCRIBE 157
#define TK_QUERY 158 #define TK_RESET 158
#define TK_CACHE 159 #define TK_QUERY 159
#define TK_EXPLAIN 160 #define TK_CACHE 160
#define TK_ANALYZE 161 #define TK_EXPLAIN 161
#define TK_VERBOSE 162 #define TK_ANALYZE 162
#define TK_NK_BOOL 163 #define TK_VERBOSE 163
#define TK_RATIO 164 #define TK_NK_BOOL 164
#define TK_NK_FLOAT 165 #define TK_RATIO 165
#define TK_COMPACT 166 #define TK_NK_FLOAT 166
#define TK_VNODES 167 #define TK_COMPACT 167
#define TK_IN 168 #define TK_VNODES 168
#define TK_OUTPUTTYPE 169 #define TK_IN 169
#define TK_AGGREGATE 170 #define TK_OUTPUTTYPE 170
#define TK_BUFSIZE 171 #define TK_AGGREGATE 171
#define TK_STREAM 172 #define TK_BUFSIZE 172
#define TK_INTO 173 #define TK_STREAM 173
#define TK_TRIGGER 174 #define TK_INTO 174
#define TK_AT_ONCE 175 #define TK_TRIGGER 175
#define TK_WINDOW_CLOSE 176 #define TK_AT_ONCE 176
#define TK_KILL 177 #define TK_WINDOW_CLOSE 177
#define TK_CONNECTION 178 #define TK_KILL 178
#define TK_TRANSACTION 179 #define TK_CONNECTION 179
#define TK_BALANCE 180 #define TK_TRANSACTION 180
#define TK_VGROUP 181 #define TK_BALANCE 181
#define TK_MERGE 182 #define TK_VGROUP 182
#define TK_REDISTRIBUTE 183 #define TK_MERGE 183
#define TK_SPLIT 184 #define TK_REDISTRIBUTE 184
#define TK_SYNCDB 185 #define TK_SPLIT 185
#define TK_DELETE 186 #define TK_SYNCDB 186
#define TK_NULL 187 #define TK_DELETE 187
#define TK_NK_QUESTION 188 #define TK_NULL 188
#define TK_NK_ARROW 189 #define TK_NK_QUESTION 189
#define TK_ROWTS 190 #define TK_NK_ARROW 190
#define TK_TBNAME 191 #define TK_ROWTS 191
#define TK_QSTARTTS 192 #define TK_TBNAME 192
#define TK_QENDTS 193 #define TK_QSTARTTS 193
#define TK_WSTARTTS 194 #define TK_QENDTS 194
#define TK_WENDTS 195 #define TK_WSTARTTS 195
#define TK_WDURATION 196 #define TK_WENDTS 196
#define TK_CAST 197 #define TK_WDURATION 197
#define TK_NOW 198 #define TK_CAST 198
#define TK_TODAY 199 #define TK_NOW 199
#define TK_TIMEZONE 200 #define TK_TODAY 200
#define TK_COUNT 201 #define TK_TIMEZONE 201
#define TK_LAST_ROW 202 #define TK_COUNT 202
#define TK_BETWEEN 203 #define TK_LAST_ROW 203
#define TK_IS 204 #define TK_BETWEEN 204
#define TK_NK_LT 205 #define TK_IS 205
#define TK_NK_GT 206 #define TK_NK_LT 206
#define TK_NK_LE 207 #define TK_NK_GT 207
#define TK_NK_GE 208 #define TK_NK_LE 208
#define TK_NK_NE 209 #define TK_NK_GE 209
#define TK_MATCH 210 #define TK_NK_NE 210
#define TK_NMATCH 211 #define TK_MATCH 211
#define TK_CONTAINS 212 #define TK_NMATCH 212
#define TK_JOIN 213 #define TK_CONTAINS 213
#define TK_INNER 214 #define TK_JOIN 214
#define TK_SELECT 215 #define TK_INNER 215
#define TK_DISTINCT 216 #define TK_SELECT 216
#define TK_WHERE 217 #define TK_DISTINCT 217
#define TK_PARTITION 218 #define TK_WHERE 218
#define TK_BY 219 #define TK_PARTITION 219
#define TK_SESSION 220 #define TK_BY 220
#define TK_STATE_WINDOW 221 #define TK_SESSION 221
#define TK_SLIDING 222 #define TK_STATE_WINDOW 222
#define TK_FILL 223 #define TK_SLIDING 223
#define TK_VALUE 224 #define TK_FILL 224
#define TK_NONE 225 #define TK_VALUE 225
#define TK_PREV 226 #define TK_NONE 226
#define TK_LINEAR 227 #define TK_PREV 227
#define TK_NEXT 228 #define TK_LINEAR 228
#define TK_HAVING 229 #define TK_NEXT 229
#define TK_RANGE 230 #define TK_HAVING 230
#define TK_EVERY 231 #define TK_RANGE 231
#define TK_ORDER 232 #define TK_EVERY 232
#define TK_SLIMIT 233 #define TK_ORDER 233
#define TK_SOFFSET 234 #define TK_SLIMIT 234
#define TK_LIMIT 235 #define TK_SOFFSET 235
#define TK_OFFSET 236 #define TK_LIMIT 236
#define TK_ASC 237 #define TK_OFFSET 237
#define TK_NULLS 238 #define TK_ASC 238
#define TK_ID 239 #define TK_NULLS 239
#define TK_NK_BITNOT 240 #define TK_ID 240
#define TK_INSERT 241 #define TK_NK_BITNOT 241
#define TK_VALUES 242 #define TK_INSERT 242
#define TK_IMPORT 243 #define TK_VALUES 243
#define TK_NK_SEMI 244 #define TK_IMPORT 244
#define TK_FILE 245 #define TK_NK_SEMI 245
#define TK_FILE 246
#define TK_NK_SPACE 300 #define TK_NK_SPACE 300
#define TK_NK_COMMENT 301 #define TK_NK_COMMENT 301
......
...@@ -97,6 +97,11 @@ typedef struct SAlterDatabaseStmt { ...@@ -97,6 +97,11 @@ typedef struct SAlterDatabaseStmt {
SDatabaseOptions* pOptions; SDatabaseOptions* pOptions;
} SAlterDatabaseStmt; } SAlterDatabaseStmt;
typedef struct SFlushDatabaseStmt {
ENodeType type;
char dbName[TSDB_DB_NAME_LEN];
} SFlushDatabaseStmt;
typedef struct STableOptions { typedef struct STableOptions {
ENodeType type; ENodeType type;
bool commentNull; bool commentNull;
......
...@@ -111,6 +111,7 @@ typedef enum ENodeType { ...@@ -111,6 +111,7 @@ typedef enum ENodeType {
QUERY_NODE_CREATE_DATABASE_STMT, QUERY_NODE_CREATE_DATABASE_STMT,
QUERY_NODE_DROP_DATABASE_STMT, QUERY_NODE_DROP_DATABASE_STMT,
QUERY_NODE_ALTER_DATABASE_STMT, QUERY_NODE_ALTER_DATABASE_STMT,
QUERY_NODE_FLUSH_DATABASE_STMT,
QUERY_NODE_CREATE_TABLE_STMT, QUERY_NODE_CREATE_TABLE_STMT,
QUERY_NODE_CREATE_SUBTABLE_CLAUSE, QUERY_NODE_CREATE_SUBTABLE_CLAUSE,
QUERY_NODE_CREATE_MULTI_TABLE_STMT, QUERY_NODE_CREATE_MULTI_TABLE_STMT,
......
...@@ -1087,7 +1087,8 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH ...@@ -1087,7 +1087,8 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH
code = mergeLastRow(uid, pTsdb, &pRow); code = mergeLastRow(uid, pTsdb, &pRow);
// if table's empty or error, return code of -1 // if table's empty or error, return code of -1
if (code < 0 || pRow == NULL) { if (code < 0 || pRow == NULL) {
return -1; *handle = NULL;
return 0;
} }
tsdbCacheInsertLastrow(pCache, uid, pRow); tsdbCacheInsertLastrow(pCache, uid, pRow);
...@@ -1116,7 +1117,8 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHand ...@@ -1116,7 +1117,8 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHand
code = mergeLast(uid, pTsdb, &pRow); code = mergeLast(uid, pTsdb, &pRow);
// if table's empty or error, return code of -1 // if table's empty or error, return code of -1
if (code < 0 || pRow == NULL) { if (code < 0 || pRow == NULL) {
return -1; *handle = NULL;
return 0;
} }
tsdbCacheInsertLast(pCache, uid, pRow); tsdbCacheInsertLast(pCache, uid, pRow);
......
...@@ -22,7 +22,7 @@ typedef struct SLastrowReader { ...@@ -22,7 +22,7 @@ typedef struct SLastrowReader {
SVnode* pVnode; SVnode* pVnode;
STSchema* pSchema; STSchema* pSchema;
uint64_t uid; uint64_t uid;
// int32_t* pSlotIds; // int32_t* pSlotIds;
char** transferBuf; // todo remove it soon char** transferBuf; // todo remove it soon
int32_t numOfCols; int32_t numOfCols;
int32_t type; int32_t type;
...@@ -30,7 +30,7 @@ typedef struct SLastrowReader { ...@@ -30,7 +30,7 @@ typedef struct SLastrowReader {
SArray* pTableList; // table id list SArray* pTableList; // table id list
} SLastrowReader; } SLastrowReader;
static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReader, const int32_t *slotIds) { static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReader, const int32_t* slotIds) {
int32_t numOfRows = pBlock->info.rows; int32_t numOfRows = pBlock->info.rows;
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
...@@ -60,8 +60,8 @@ static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReade ...@@ -60,8 +60,8 @@ static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReade
pBlock->info.rows += 1; pBlock->info.rows += 1;
} }
int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t* colId, int32_t numOfCols,
int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t* colId, int32_t numOfCols, void** pReader) { void** pReader) {
SLastrowReader* p = taosMemoryCalloc(1, sizeof(SLastrowReader)); SLastrowReader* p = taosMemoryCalloc(1, sizeof(SLastrowReader));
if (p == NULL) { if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
...@@ -101,7 +101,7 @@ int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, ...@@ -101,7 +101,7 @@ int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList,
int32_t tsdbLastrowReaderClose(void* pReader) { int32_t tsdbLastrowReaderClose(void* pReader) {
SLastrowReader* p = pReader; SLastrowReader* p = pReader;
for(int32_t i = 0; i < p->numOfCols; ++i) { for (int32_t i = 0; i < p->numOfCols; ++i) {
taosMemoryFreeClear(p->transferBuf[i]); taosMemoryFreeClear(p->transferBuf[i]);
} }
...@@ -117,6 +117,7 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t ...@@ -117,6 +117,7 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t
SLastrowReader* pr = pReader; SLastrowReader* pr = pReader;
LRUHandle* h = NULL;
STSRow* pRow = NULL; STSRow* pRow = NULL;
size_t numOfTables = taosArrayGetSize(pr->pTableList); size_t numOfTables = taosArrayGetSize(pr->pTableList);
...@@ -127,15 +128,17 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t ...@@ -127,15 +128,17 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t
for (int32_t i = 0; i < numOfTables; ++i) { for (int32_t i = 0; i < numOfTables; ++i) {
STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i); STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i);
int32_t code = tsdbCacheGetLastrow(pr->pVnode->pTsdb->lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &pRow); /* int32_t code = tsdbCacheGetLastrow(pr->pVnode->pTsdb->lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &pRow); */
int32_t code = tsdbCacheGetLastrowH(pr->pVnode->pTsdb->lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
if (pRow == NULL) { if (h == NULL) {
continue; continue;
} }
pRow = (STSRow*)taosLRUCacheValue(pr->pVnode->pTsdb->lruCache, h);
if (pRow->ts > lastKey) { if (pRow->ts > lastKey) {
// Set result row into the same rowIndex repeatly, so we need to check if the internal result row has already // Set result row into the same rowIndex repeatly, so we need to check if the internal result row has already
// appended or not. // appended or not.
...@@ -147,23 +150,29 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t ...@@ -147,23 +150,29 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t
internalResult = true; internalResult = true;
lastKey = pRow->ts; lastKey = pRow->ts;
} }
tsdbCacheRelease(pr->pVnode->pTsdb->lruCache, h);
} }
} else if (pr->type == LASTROW_RETRIEVE_TYPE_ALL) { } else if (pr->type == LASTROW_RETRIEVE_TYPE_ALL) {
for (int32_t i = pr->tableIndex; i < numOfTables; ++i) { for (int32_t i = pr->tableIndex; i < numOfTables; ++i) {
STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i); STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i);
int32_t code = tsdbCacheGetLastrow(pr->pVnode->pTsdb->lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &pRow); /* int32_t code = tsdbCacheGetLastrow(pr->pVnode->pTsdb->lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &pRow); */
int32_t code = tsdbCacheGetLastrowH(pr->pVnode->pTsdb->lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
// no data in the table of Uid // no data in the table of Uid
if (pRow == NULL) { if (h == NULL) {
continue; continue;
} }
pRow = (STSRow*)taosLRUCacheValue(pr->pVnode->pTsdb->lruCache, h);
saveOneRow(pRow, pResBlock, pr, slotIds); saveOneRow(pRow, pResBlock, pr, slotIds);
tsdbCacheRelease(pr->pVnode->pTsdb->lruCache, h);
pr->tableIndex += 1; pr->tableIndex += 1;
if (pResBlock->info.rows >= pResBlock->info.capacity) { if (pResBlock->info.rows >= pResBlock->info.capacity) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -102,6 +102,8 @@ SNode* nodesMakeNode(ENodeType type) { ...@@ -102,6 +102,8 @@ SNode* nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SDropDatabaseStmt)); return makeNode(type, sizeof(SDropDatabaseStmt));
case QUERY_NODE_ALTER_DATABASE_STMT: case QUERY_NODE_ALTER_DATABASE_STMT:
return makeNode(type, sizeof(SAlterDatabaseStmt)); return makeNode(type, sizeof(SAlterDatabaseStmt));
case QUERY_NODE_FLUSH_DATABASE_STMT:
return makeNode(type, sizeof(SFlushDatabaseStmt));
case QUERY_NODE_CREATE_TABLE_STMT: case QUERY_NODE_CREATE_TABLE_STMT:
return makeNode(type, sizeof(SCreateTableStmt)); return makeNode(type, sizeof(SCreateTableStmt));
case QUERY_NODE_CREATE_SUBTABLE_CLAUSE: case QUERY_NODE_CREATE_SUBTABLE_CLAUSE:
...@@ -540,6 +542,8 @@ void nodesDestroyNode(SNode* pNode) { ...@@ -540,6 +542,8 @@ void nodesDestroyNode(SNode* pNode) {
case QUERY_NODE_ALTER_DATABASE_STMT: case QUERY_NODE_ALTER_DATABASE_STMT:
nodesDestroyNode((SNode*)((SAlterDatabaseStmt*)pNode)->pOptions); nodesDestroyNode((SNode*)((SAlterDatabaseStmt*)pNode)->pOptions);
break; break;
case QUERY_NODE_FLUSH_DATABASE_STMT: // no pointer field
break;
case QUERY_NODE_CREATE_TABLE_STMT: { case QUERY_NODE_CREATE_TABLE_STMT: {
SCreateTableStmt* pStmt = (SCreateTableStmt*)pNode; SCreateTableStmt* pStmt = (SCreateTableStmt*)pNode;
nodesDestroyList(pStmt->pCols); nodesDestroyList(pStmt->pCols);
......
...@@ -135,6 +135,7 @@ SNode* setAlterDatabaseOption(SAstCreateContext* pCxt, SNode* pOptions, SAlterOp ...@@ -135,6 +135,7 @@ SNode* setAlterDatabaseOption(SAstCreateContext* pCxt, SNode* pOptions, SAlterOp
SNode* createCreateDatabaseStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken* pDbName, SNode* pOptions); SNode* createCreateDatabaseStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken* pDbName, SNode* pOptions);
SNode* createDropDatabaseStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pDbName); SNode* createDropDatabaseStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pDbName);
SNode* createAlterDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName, SNode* pOptions); SNode* createAlterDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName, SNode* pOptions);
SNode* createFlushDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName);
SNode* createDefaultTableOptions(SAstCreateContext* pCxt); SNode* createDefaultTableOptions(SAstCreateContext* pCxt);
SNode* createAlterTableOptions(SAstCreateContext* pCxt); SNode* createAlterTableOptions(SAstCreateContext* pCxt);
SNode* setTableOption(SAstCreateContext* pCxt, SNode* pOptions, ETableOptionType type, void* pVal); SNode* setTableOption(SAstCreateContext* pCxt, SNode* pOptions, ETableOptionType type, void* pVal);
......
...@@ -157,6 +157,7 @@ cmd ::= CREATE DATABASE not_exists_opt(A) db_name(B) db_options(C). ...@@ -157,6 +157,7 @@ cmd ::= CREATE DATABASE not_exists_opt(A) db_name(B) db_options(C).
cmd ::= DROP DATABASE exists_opt(A) db_name(B). { pCxt->pRootNode = createDropDatabaseStmt(pCxt, A, &B); } cmd ::= DROP DATABASE exists_opt(A) db_name(B). { pCxt->pRootNode = createDropDatabaseStmt(pCxt, A, &B); }
cmd ::= USE db_name(A). { pCxt->pRootNode = createUseDatabaseStmt(pCxt, &A); } cmd ::= USE db_name(A). { pCxt->pRootNode = createUseDatabaseStmt(pCxt, &A); }
cmd ::= ALTER DATABASE db_name(A) alter_db_options(B). { pCxt->pRootNode = createAlterDatabaseStmt(pCxt, &A, B); } cmd ::= ALTER DATABASE db_name(A) alter_db_options(B). { pCxt->pRootNode = createAlterDatabaseStmt(pCxt, &A, B); }
cmd ::= FLUSH DATABASE db_name(A). { pCxt->pRootNode = createFlushDatabaseStmt(pCxt, &A); }
%type not_exists_opt { bool } %type not_exists_opt { bool }
%destructor not_exists_opt { } %destructor not_exists_opt { }
......
...@@ -903,6 +903,17 @@ SNode* createAlterDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName, SNode* ...@@ -903,6 +903,17 @@ SNode* createAlterDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName, SNode*
return (SNode*)pStmt; return (SNode*)pStmt;
} }
SNode* createFlushDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName) {
CHECK_PARSER_STATUS(pCxt);
if (!checkDbName(pCxt, pDbName, false)) {
return NULL;
}
SAlterDatabaseStmt* pStmt = (SAlterDatabaseStmt*)nodesMakeNode(QUERY_NODE_FLUSH_DATABASE_STMT);
CHECK_OUT_OF_MEM(pStmt);
COPY_STRING_FORM_ID_TOKEN(pStmt->dbName, pDbName);
return (SNode*)pStmt;
}
SNode* createDefaultTableOptions(SAstCreateContext* pCxt) { SNode* createDefaultTableOptions(SAstCreateContext* pCxt) {
CHECK_PARSER_STATUS(pCxt); CHECK_PARSER_STATUS(pCxt);
STableOptions* pOptions = (STableOptions*)nodesMakeNode(QUERY_NODE_TABLE_OPTIONS); STableOptions* pOptions = (STableOptions*)nodesMakeNode(QUERY_NODE_TABLE_OPTIONS);
......
...@@ -195,6 +195,10 @@ static int32_t collectMetaKeyFromAlterDatabase(SCollectMetaKeyCxt* pCxt, SAlterD ...@@ -195,6 +195,10 @@ static int32_t collectMetaKeyFromAlterDatabase(SCollectMetaKeyCxt* pCxt, SAlterD
return reserveDbCfgInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pCxt->pMetaCache); return reserveDbCfgInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pCxt->pMetaCache);
} }
static int32_t collectMetaKeyFromFlushDatabase(SCollectMetaKeyCxt* pCxt, SFlushDatabaseStmt* pStmt) {
return reserveDbVgInfoInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pCxt->pMetaCache);
}
static int32_t collectMetaKeyFromCreateTable(SCollectMetaKeyCxt* pCxt, SCreateTableStmt* pStmt) { static int32_t collectMetaKeyFromCreateTable(SCollectMetaKeyCxt* pCxt, SCreateTableStmt* pStmt) {
int32_t code = reserveDbCfgInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pCxt->pMetaCache); int32_t code = reserveDbCfgInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pCxt->pMetaCache);
if (TSDB_CODE_SUCCESS == code && NULL == pStmt->pTags) { if (TSDB_CODE_SUCCESS == code && NULL == pStmt->pTags) {
...@@ -475,6 +479,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) { ...@@ -475,6 +479,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
return collectMetaKeyFromSelect(pCxt, (SSelectStmt*)pStmt); return collectMetaKeyFromSelect(pCxt, (SSelectStmt*)pStmt);
case QUERY_NODE_ALTER_DATABASE_STMT: case QUERY_NODE_ALTER_DATABASE_STMT:
return collectMetaKeyFromAlterDatabase(pCxt, (SAlterDatabaseStmt*)pStmt); return collectMetaKeyFromAlterDatabase(pCxt, (SAlterDatabaseStmt*)pStmt);
case QUERY_NODE_FLUSH_DATABASE_STMT:
return collectMetaKeyFromFlushDatabase(pCxt, (SFlushDatabaseStmt*)pStmt);
case QUERY_NODE_CREATE_TABLE_STMT: case QUERY_NODE_CREATE_TABLE_STMT:
return collectMetaKeyFromCreateTable(pCxt, (SCreateTableStmt*)pStmt); return collectMetaKeyFromCreateTable(pCxt, (SCreateTableStmt*)pStmt);
case QUERY_NODE_CREATE_MULTI_TABLE_STMT: case QUERY_NODE_CREATE_MULTI_TABLE_STMT:
......
...@@ -87,6 +87,7 @@ static SKeyword keywordTable[] = { ...@@ -87,6 +87,7 @@ static SKeyword keywordTable[] = {
{"FILL", TK_FILL}, {"FILL", TK_FILL},
{"FIRST", TK_FIRST}, {"FIRST", TK_FIRST},
{"FLOAT", TK_FLOAT}, {"FLOAT", TK_FLOAT},
{"FLUSH", TK_FLUSH},
{"FROM", TK_FROM}, {"FROM", TK_FROM},
{"FSYNC", TK_FSYNC}, {"FSYNC", TK_FSYNC},
{"FUNCTION", TK_FUNCTION}, {"FUNCTION", TK_FUNCTION},
......
...@@ -5863,6 +5863,67 @@ static int32_t rewriteAlterTable(STranslateContext* pCxt, SQuery* pQuery) { ...@@ -5863,6 +5863,67 @@ static int32_t rewriteAlterTable(STranslateContext* pCxt, SQuery* pQuery) {
return code; return code;
} }
static int32_t serializeFlushVgroup(SVgroupInfo* pVg, SArray* pBufArray) {
int32_t len = sizeof(SMsgHead);
void* buf = taosMemoryMalloc(len);
if (NULL == buf) {
return TSDB_CODE_OUT_OF_MEMORY;
}
((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
((SMsgHead*)buf)->contLen = htonl(len);
SVgDataBlocks* pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
if (NULL == pVgData) {
taosMemoryFree(buf);
return TSDB_CODE_OUT_OF_MEMORY;
}
pVgData->vg = *pVg;
pVgData->pData = buf;
pVgData->size = len;
taosArrayPush(pBufArray, &pVgData);
return TSDB_CODE_SUCCESS;
}
static int32_t serializeFlushDb(SArray* pVgs, SArray** pOutput) {
int32_t numOfVgs = taosArrayGetSize(pVgs);
SArray* pBufArray = taosArrayInit(numOfVgs, sizeof(void*));
if (NULL == pBufArray) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < numOfVgs; ++i) {
int32_t code = serializeFlushVgroup((SVgroupInfo*)taosArrayGet(pVgs, i), pBufArray);
if (TSDB_CODE_SUCCESS != code) {
taosArrayDestroy(pBufArray);
return code;
}
}
*pOutput = pBufArray;
return TSDB_CODE_SUCCESS;
}
static int32_t rewriteFlushDatabase(STranslateContext* pCxt, SQuery* pQuery) {
SFlushDatabaseStmt* pStmt = (SFlushDatabaseStmt*)pQuery->pRoot;
SArray* pBufArray = NULL;
SArray* pVgs = NULL;
int32_t code = getDBVgInfo(pCxt, pStmt->dbName, &pVgs);
if (TSDB_CODE_SUCCESS == code) {
code = serializeFlushDb(pVgs, &pBufArray);
}
if (TSDB_CODE_SUCCESS == code) {
code = rewriteToVnodeModifyOpStmt(pQuery, pBufArray);
}
if (TSDB_CODE_SUCCESS != code) {
taosArrayDestroy(pBufArray);
}
taosArrayDestroy(pVgs);
return code;
}
static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) { static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
switch (nodeType(pQuery->pRoot)) { switch (nodeType(pQuery->pRoot)) {
...@@ -5911,6 +5972,9 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) { ...@@ -5911,6 +5972,9 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
case QUERY_NODE_ALTER_TABLE_STMT: case QUERY_NODE_ALTER_TABLE_STMT:
code = rewriteAlterTable(pCxt, pQuery); code = rewriteAlterTable(pCxt, pQuery);
break; break;
case QUERY_NODE_FLUSH_DATABASE_STMT:
code = rewriteFlushDatabase(pCxt, pQuery);
break;
default: default:
break; break;
} }
......
此差异已折叠。
...@@ -1101,6 +1101,8 @@ static int32_t getMsgType(ENodeType sqlType) { ...@@ -1101,6 +1101,8 @@ static int32_t getMsgType(ENodeType sqlType) {
return TDMT_VND_DROP_TABLE; return TDMT_VND_DROP_TABLE;
case QUERY_NODE_ALTER_TABLE_STMT: case QUERY_NODE_ALTER_TABLE_STMT:
return TDMT_VND_ALTER_TABLE; return TDMT_VND_ALTER_TABLE;
case QUERY_NODE_FLUSH_DATABASE_STMT:
return TDMT_VND_COMMIT;
default: default:
break; break;
} }
......
...@@ -448,6 +448,10 @@ int32_t schHandleDropCallback(void *param, const SDataBuf *pMsg, int32_t code) { ...@@ -448,6 +448,10 @@ int32_t schHandleDropCallback(void *param, const SDataBuf *pMsg, int32_t code) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schHandleCommitCallback(void *param, const SDataBuf *pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_COMMIT_RSP, code);
}
int32_t schHandleLinkBrokenCallback(void *param, const SDataBuf *pMsg, int32_t code) { int32_t schHandleLinkBrokenCallback(void *param, const SDataBuf *pMsg, int32_t code) {
SSchCallbackParamHeader *head = (SSchCallbackParamHeader *)param; SSchCallbackParamHeader *head = (SSchCallbackParamHeader *)param;
rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT); rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT);
...@@ -586,6 +590,9 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { ...@@ -586,6 +590,9 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
case TDMT_VND_QUERY_HEARTBEAT: case TDMT_VND_QUERY_HEARTBEAT:
*fp = schHandleHbCallback; *fp = schHandleHbCallback;
break; break;
case TDMT_VND_COMMIT:
*fp = schHandleCommitCallback;
break;
case TDMT_SCH_LINK_BROKEN: case TDMT_SCH_LINK_BROKEN:
*fp = schHandleLinkBrokenCallback; *fp = schHandleLinkBrokenCallback;
break; break;
...@@ -1000,7 +1007,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, ...@@ -1000,7 +1007,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
case TDMT_VND_CREATE_TABLE: case TDMT_VND_CREATE_TABLE:
case TDMT_VND_DROP_TABLE: case TDMT_VND_DROP_TABLE:
case TDMT_VND_ALTER_TABLE: case TDMT_VND_ALTER_TABLE:
case TDMT_VND_SUBMIT: { case TDMT_VND_SUBMIT:
case TDMT_VND_COMMIT: {
msgSize = pTask->msgLen; msgSize = pTask->msgLen;
msg = taosMemoryCalloc(1, msgSize); msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) { if (NULL == msg) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册