提交 b02e2ff9 编写于 作者: S slzhou

Merge branch '3.0' of github.com:taosdata/TDengine into szhou/fixbugs

......@@ -1478,6 +1478,7 @@ typedef struct {
int32_t dnodeId;
char fqdn[TSDB_FQDN_LEN];
int32_t port;
int8_t force;
} SDropDnodeReq;
int32_t tSerializeSDropDnodeReq(void* buf, int32_t bufLen, SDropDnodeReq* pReq);
......
......@@ -66,280 +66,281 @@
#define TK_PORT 48
#define TK_DNODES 49
#define TK_NK_IPTOKEN 50
#define TK_LOCAL 51
#define TK_QNODE 52
#define TK_BNODE 53
#define TK_SNODE 54
#define TK_MNODE 55
#define TK_DATABASE 56
#define TK_USE 57
#define TK_FLUSH 58
#define TK_TRIM 59
#define TK_IF 60
#define TK_NOT 61
#define TK_EXISTS 62
#define TK_BUFFER 63
#define TK_CACHEMODEL 64
#define TK_CACHESIZE 65
#define TK_COMP 66
#define TK_DURATION 67
#define TK_NK_VARIABLE 68
#define TK_MAXROWS 69
#define TK_MINROWS 70
#define TK_KEEP 71
#define TK_PAGES 72
#define TK_PAGESIZE 73
#define TK_TSDB_PAGESIZE 74
#define TK_PRECISION 75
#define TK_REPLICA 76
#define TK_STRICT 77
#define TK_VGROUPS 78
#define TK_SINGLE_STABLE 79
#define TK_RETENTIONS 80
#define TK_SCHEMALESS 81
#define TK_WAL_LEVEL 82
#define TK_WAL_FSYNC_PERIOD 83
#define TK_WAL_RETENTION_PERIOD 84
#define TK_WAL_RETENTION_SIZE 85
#define TK_WAL_ROLL_PERIOD 86
#define TK_WAL_SEGMENT_SIZE 87
#define TK_STT_TRIGGER 88
#define TK_TABLE_PREFIX 89
#define TK_TABLE_SUFFIX 90
#define TK_NK_COLON 91
#define TK_MAX_SPEED 92
#define TK_TABLE 93
#define TK_NK_LP 94
#define TK_NK_RP 95
#define TK_STABLE 96
#define TK_ADD 97
#define TK_COLUMN 98
#define TK_MODIFY 99
#define TK_RENAME 100
#define TK_TAG 101
#define TK_SET 102
#define TK_NK_EQ 103
#define TK_USING 104
#define TK_TAGS 105
#define TK_COMMENT 106
#define TK_BOOL 107
#define TK_TINYINT 108
#define TK_SMALLINT 109
#define TK_INT 110
#define TK_INTEGER 111
#define TK_BIGINT 112
#define TK_FLOAT 113
#define TK_DOUBLE 114
#define TK_BINARY 115
#define TK_TIMESTAMP 116
#define TK_NCHAR 117
#define TK_UNSIGNED 118
#define TK_JSON 119
#define TK_VARCHAR 120
#define TK_MEDIUMBLOB 121
#define TK_BLOB 122
#define TK_VARBINARY 123
#define TK_DECIMAL 124
#define TK_MAX_DELAY 125
#define TK_WATERMARK 126
#define TK_ROLLUP 127
#define TK_TTL 128
#define TK_SMA 129
#define TK_FIRST 130
#define TK_LAST 131
#define TK_SHOW 132
#define TK_DATABASES 133
#define TK_TABLES 134
#define TK_STABLES 135
#define TK_MNODES 136
#define TK_QNODES 137
#define TK_FUNCTIONS 138
#define TK_INDEXES 139
#define TK_ACCOUNTS 140
#define TK_APPS 141
#define TK_CONNECTIONS 142
#define TK_LICENCES 143
#define TK_GRANTS 144
#define TK_QUERIES 145
#define TK_SCORES 146
#define TK_TOPICS 147
#define TK_VARIABLES 148
#define TK_CLUSTER 149
#define TK_BNODES 150
#define TK_SNODES 151
#define TK_TRANSACTIONS 152
#define TK_DISTRIBUTED 153
#define TK_CONSUMERS 154
#define TK_SUBSCRIPTIONS 155
#define TK_VNODES 156
#define TK_LIKE 157
#define TK_INDEX 158
#define TK_FUNCTION 159
#define TK_INTERVAL 160
#define TK_TOPIC 161
#define TK_AS 162
#define TK_WITH 163
#define TK_META 164
#define TK_CONSUMER 165
#define TK_GROUP 166
#define TK_DESC 167
#define TK_DESCRIBE 168
#define TK_RESET 169
#define TK_QUERY 170
#define TK_CACHE 171
#define TK_EXPLAIN 172
#define TK_ANALYZE 173
#define TK_VERBOSE 174
#define TK_NK_BOOL 175
#define TK_RATIO 176
#define TK_NK_FLOAT 177
#define TK_OUTPUTTYPE 178
#define TK_AGGREGATE 179
#define TK_BUFSIZE 180
#define TK_STREAM 181
#define TK_INTO 182
#define TK_TRIGGER 183
#define TK_AT_ONCE 184
#define TK_WINDOW_CLOSE 185
#define TK_IGNORE 186
#define TK_EXPIRED 187
#define TK_FILL_HISTORY 188
#define TK_SUBTABLE 189
#define TK_KILL 190
#define TK_CONNECTION 191
#define TK_TRANSACTION 192
#define TK_BALANCE 193
#define TK_VGROUP 194
#define TK_MERGE 195
#define TK_REDISTRIBUTE 196
#define TK_SPLIT 197
#define TK_DELETE 198
#define TK_INSERT 199
#define TK_NULL 200
#define TK_NK_QUESTION 201
#define TK_NK_ARROW 202
#define TK_ROWTS 203
#define TK_TBNAME 204
#define TK_QSTART 205
#define TK_QEND 206
#define TK_QDURATION 207
#define TK_WSTART 208
#define TK_WEND 209
#define TK_WDURATION 210
#define TK_IROWTS 211
#define TK_QTAGS 212
#define TK_CAST 213
#define TK_NOW 214
#define TK_TODAY 215
#define TK_TIMEZONE 216
#define TK_CLIENT_VERSION 217
#define TK_SERVER_VERSION 218
#define TK_SERVER_STATUS 219
#define TK_CURRENT_USER 220
#define TK_COUNT 221
#define TK_LAST_ROW 222
#define TK_CASE 223
#define TK_END 224
#define TK_WHEN 225
#define TK_THEN 226
#define TK_ELSE 227
#define TK_BETWEEN 228
#define TK_IS 229
#define TK_NK_LT 230
#define TK_NK_GT 231
#define TK_NK_LE 232
#define TK_NK_GE 233
#define TK_NK_NE 234
#define TK_MATCH 235
#define TK_NMATCH 236
#define TK_CONTAINS 237
#define TK_IN 238
#define TK_JOIN 239
#define TK_INNER 240
#define TK_SELECT 241
#define TK_DISTINCT 242
#define TK_WHERE 243
#define TK_PARTITION 244
#define TK_BY 245
#define TK_SESSION 246
#define TK_STATE_WINDOW 247
#define TK_SLIDING 248
#define TK_FILL 249
#define TK_VALUE 250
#define TK_NONE 251
#define TK_PREV 252
#define TK_LINEAR 253
#define TK_NEXT 254
#define TK_HAVING 255
#define TK_RANGE 256
#define TK_EVERY 257
#define TK_ORDER 258
#define TK_SLIMIT 259
#define TK_SOFFSET 260
#define TK_LIMIT 261
#define TK_OFFSET 262
#define TK_ASC 263
#define TK_NULLS 264
#define TK_ABORT 265
#define TK_AFTER 266
#define TK_ATTACH 267
#define TK_BEFORE 268
#define TK_BEGIN 269
#define TK_BITAND 270
#define TK_BITNOT 271
#define TK_BITOR 272
#define TK_BLOCKS 273
#define TK_CHANGE 274
#define TK_COMMA 275
#define TK_COMPACT 276
#define TK_CONCAT 277
#define TK_CONFLICT 278
#define TK_COPY 279
#define TK_DEFERRED 280
#define TK_DELIMITERS 281
#define TK_DETACH 282
#define TK_DIVIDE 283
#define TK_DOT 284
#define TK_EACH 285
#define TK_FAIL 286
#define TK_FILE 287
#define TK_FOR 288
#define TK_GLOB 289
#define TK_ID 290
#define TK_IMMEDIATE 291
#define TK_IMPORT 292
#define TK_INITIALLY 293
#define TK_INSTEAD 294
#define TK_ISNULL 295
#define TK_KEY 296
#define TK_MODULES 297
#define TK_NK_BITNOT 298
#define TK_NK_SEMI 299
#define TK_NOTNULL 300
#define TK_OF 301
#define TK_PLUS 302
#define TK_PRIVILEGE 303
#define TK_RAISE 304
#define TK_REPLACE 305
#define TK_RESTRICT 306
#define TK_ROW 307
#define TK_SEMI 308
#define TK_STAR 309
#define TK_STATEMENT 310
#define TK_STRING 311
#define TK_TIMES 312
#define TK_UPDATE 313
#define TK_VALUES 314
#define TK_VARIABLE 315
#define TK_VIEW 316
#define TK_WAL 317
#define TK_FORCE 51
#define TK_LOCAL 52
#define TK_QNODE 53
#define TK_BNODE 54
#define TK_SNODE 55
#define TK_MNODE 56
#define TK_DATABASE 57
#define TK_USE 58
#define TK_FLUSH 59
#define TK_TRIM 60
#define TK_IF 61
#define TK_NOT 62
#define TK_EXISTS 63
#define TK_BUFFER 64
#define TK_CACHEMODEL 65
#define TK_CACHESIZE 66
#define TK_COMP 67
#define TK_DURATION 68
#define TK_NK_VARIABLE 69
#define TK_MAXROWS 70
#define TK_MINROWS 71
#define TK_KEEP 72
#define TK_PAGES 73
#define TK_PAGESIZE 74
#define TK_TSDB_PAGESIZE 75
#define TK_PRECISION 76
#define TK_REPLICA 77
#define TK_STRICT 78
#define TK_VGROUPS 79
#define TK_SINGLE_STABLE 80
#define TK_RETENTIONS 81
#define TK_SCHEMALESS 82
#define TK_WAL_LEVEL 83
#define TK_WAL_FSYNC_PERIOD 84
#define TK_WAL_RETENTION_PERIOD 85
#define TK_WAL_RETENTION_SIZE 86
#define TK_WAL_ROLL_PERIOD 87
#define TK_WAL_SEGMENT_SIZE 88
#define TK_STT_TRIGGER 89
#define TK_TABLE_PREFIX 90
#define TK_TABLE_SUFFIX 91
#define TK_NK_COLON 92
#define TK_MAX_SPEED 93
#define TK_TABLE 94
#define TK_NK_LP 95
#define TK_NK_RP 96
#define TK_STABLE 97
#define TK_ADD 98
#define TK_COLUMN 99
#define TK_MODIFY 100
#define TK_RENAME 101
#define TK_TAG 102
#define TK_SET 103
#define TK_NK_EQ 104
#define TK_USING 105
#define TK_TAGS 106
#define TK_COMMENT 107
#define TK_BOOL 108
#define TK_TINYINT 109
#define TK_SMALLINT 110
#define TK_INT 111
#define TK_INTEGER 112
#define TK_BIGINT 113
#define TK_FLOAT 114
#define TK_DOUBLE 115
#define TK_BINARY 116
#define TK_TIMESTAMP 117
#define TK_NCHAR 118
#define TK_UNSIGNED 119
#define TK_JSON 120
#define TK_VARCHAR 121
#define TK_MEDIUMBLOB 122
#define TK_BLOB 123
#define TK_VARBINARY 124
#define TK_DECIMAL 125
#define TK_MAX_DELAY 126
#define TK_WATERMARK 127
#define TK_ROLLUP 128
#define TK_TTL 129
#define TK_SMA 130
#define TK_FIRST 131
#define TK_LAST 132
#define TK_SHOW 133
#define TK_DATABASES 134
#define TK_TABLES 135
#define TK_STABLES 136
#define TK_MNODES 137
#define TK_QNODES 138
#define TK_FUNCTIONS 139
#define TK_INDEXES 140
#define TK_ACCOUNTS 141
#define TK_APPS 142
#define TK_CONNECTIONS 143
#define TK_LICENCES 144
#define TK_GRANTS 145
#define TK_QUERIES 146
#define TK_SCORES 147
#define TK_TOPICS 148
#define TK_VARIABLES 149
#define TK_CLUSTER 150
#define TK_BNODES 151
#define TK_SNODES 152
#define TK_TRANSACTIONS 153
#define TK_DISTRIBUTED 154
#define TK_CONSUMERS 155
#define TK_SUBSCRIPTIONS 156
#define TK_VNODES 157
#define TK_LIKE 158
#define TK_INDEX 159
#define TK_FUNCTION 160
#define TK_INTERVAL 161
#define TK_TOPIC 162
#define TK_AS 163
#define TK_WITH 164
#define TK_META 165
#define TK_CONSUMER 166
#define TK_GROUP 167
#define TK_DESC 168
#define TK_DESCRIBE 169
#define TK_RESET 170
#define TK_QUERY 171
#define TK_CACHE 172
#define TK_EXPLAIN 173
#define TK_ANALYZE 174
#define TK_VERBOSE 175
#define TK_NK_BOOL 176
#define TK_RATIO 177
#define TK_NK_FLOAT 178
#define TK_OUTPUTTYPE 179
#define TK_AGGREGATE 180
#define TK_BUFSIZE 181
#define TK_STREAM 182
#define TK_INTO 183
#define TK_TRIGGER 184
#define TK_AT_ONCE 185
#define TK_WINDOW_CLOSE 186
#define TK_IGNORE 187
#define TK_EXPIRED 188
#define TK_FILL_HISTORY 189
#define TK_SUBTABLE 190
#define TK_KILL 191
#define TK_CONNECTION 192
#define TK_TRANSACTION 193
#define TK_BALANCE 194
#define TK_VGROUP 195
#define TK_MERGE 196
#define TK_REDISTRIBUTE 197
#define TK_SPLIT 198
#define TK_DELETE 199
#define TK_INSERT 200
#define TK_NULL 201
#define TK_NK_QUESTION 202
#define TK_NK_ARROW 203
#define TK_ROWTS 204
#define TK_TBNAME 205
#define TK_QSTART 206
#define TK_QEND 207
#define TK_QDURATION 208
#define TK_WSTART 209
#define TK_WEND 210
#define TK_WDURATION 211
#define TK_IROWTS 212
#define TK_QTAGS 213
#define TK_CAST 214
#define TK_NOW 215
#define TK_TODAY 216
#define TK_TIMEZONE 217
#define TK_CLIENT_VERSION 218
#define TK_SERVER_VERSION 219
#define TK_SERVER_STATUS 220
#define TK_CURRENT_USER 221
#define TK_COUNT 222
#define TK_LAST_ROW 223
#define TK_CASE 224
#define TK_END 225
#define TK_WHEN 226
#define TK_THEN 227
#define TK_ELSE 228
#define TK_BETWEEN 229
#define TK_IS 230
#define TK_NK_LT 231
#define TK_NK_GT 232
#define TK_NK_LE 233
#define TK_NK_GE 234
#define TK_NK_NE 235
#define TK_MATCH 236
#define TK_NMATCH 237
#define TK_CONTAINS 238
#define TK_IN 239
#define TK_JOIN 240
#define TK_INNER 241
#define TK_SELECT 242
#define TK_DISTINCT 243
#define TK_WHERE 244
#define TK_PARTITION 245
#define TK_BY 246
#define TK_SESSION 247
#define TK_STATE_WINDOW 248
#define TK_SLIDING 249
#define TK_FILL 250
#define TK_VALUE 251
#define TK_NONE 252
#define TK_PREV 253
#define TK_LINEAR 254
#define TK_NEXT 255
#define TK_HAVING 256
#define TK_RANGE 257
#define TK_EVERY 258
#define TK_ORDER 259
#define TK_SLIMIT 260
#define TK_SOFFSET 261
#define TK_LIMIT 262
#define TK_OFFSET 263
#define TK_ASC 264
#define TK_NULLS 265
#define TK_ABORT 266
#define TK_AFTER 267
#define TK_ATTACH 268
#define TK_BEFORE 269
#define TK_BEGIN 270
#define TK_BITAND 271
#define TK_BITNOT 272
#define TK_BITOR 273
#define TK_BLOCKS 274
#define TK_CHANGE 275
#define TK_COMMA 276
#define TK_COMPACT 277
#define TK_CONCAT 278
#define TK_CONFLICT 279
#define TK_COPY 280
#define TK_DEFERRED 281
#define TK_DELIMITERS 282
#define TK_DETACH 283
#define TK_DIVIDE 284
#define TK_DOT 285
#define TK_EACH 286
#define TK_FAIL 287
#define TK_FILE 288
#define TK_FOR 289
#define TK_GLOB 290
#define TK_ID 291
#define TK_IMMEDIATE 292
#define TK_IMPORT 293
#define TK_INITIALLY 294
#define TK_INSTEAD 295
#define TK_ISNULL 296
#define TK_KEY 297
#define TK_MODULES 298
#define TK_NK_BITNOT 299
#define TK_NK_SEMI 300
#define TK_NOTNULL 301
#define TK_OF 302
#define TK_PLUS 303
#define TK_PRIVILEGE 304
#define TK_RAISE 305
#define TK_REPLACE 306
#define TK_RESTRICT 307
#define TK_ROW 308
#define TK_SEMI 309
#define TK_STAR 310
#define TK_STATEMENT 311
#define TK_STRING 312
#define TK_TIMES 313
#define TK_UPDATE 314
#define TK_VALUES 315
#define TK_VARIABLE 316
#define TK_VIEW 317
#define TK_WAL 318
#define TK_NK_SPACE 300
#define TK_NK_COMMENT 301
#define TK_NK_ILLEGAL 302
#define TK_NK_HEX 303 // hex number 0x123
#define TK_NK_OCT 304 // oct number
#define TK_NK_BIN 305 // bin format data 0b111
#define TK_NK_SPACE 600
#define TK_NK_COMMENT 601
#define TK_NK_ILLEGAL 602
#define TK_NK_HEX 603 // hex number 0x123
#define TK_NK_OCT 604 // oct number
#define TK_NK_BIN 605 // bin format data 0b111
#define TK_NK_NIL 65535
......
......@@ -235,6 +235,7 @@ typedef struct SDropDnodeStmt {
int32_t dnodeId;
char fqdn[TSDB_FQDN_LEN];
int32_t port;
bool force;
} SDropDnodeStmt;
typedef struct SAlterDnodeStmt {
......
......@@ -232,6 +232,8 @@ int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader);
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex);
int32_t syncEndSnapshot(int64_t rid);
int32_t syncStepDown(int64_t rid, SyncTerm newTerm);
#ifdef __cplusplus
}
#endif
......
......@@ -679,10 +679,13 @@ void syncReconfigFinishLog(const SyncReconfigFinish* pMsg);
void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg);
// ---------------------------------------------
typedef enum {
SYNC_LOCAL_CMD_STEP_DOWN = 100,
} ESyncLocalCmd;
const char* syncLocalCmdGetStr(int32_t cmd);
typedef struct SyncLocalCmd {
uint32_t bytes;
int32_t vgId;
......
......@@ -36,6 +36,7 @@ void *taosMemoryStrDup(const char *ptr);
void taosMemoryFree(void *ptr);
int64_t taosMemorySize(void *ptr);
void taosPrintBackTrace();
void taosMemoryTrim(int32_t size);
#define taosMemoryFreeClear(ptr) \
do { \
......
......@@ -1485,6 +1485,7 @@ int32_t tSerializeSDropDnodeReq(void *buf, int32_t bufLen, SDropDnodeReq *pReq)
if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->fqdn) < 0) return -1;
if (tEncodeI32(&encoder, pReq->port) < 0) return -1;
if (tEncodeI8(&encoder, pReq->force) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
......@@ -1500,6 +1501,7 @@ int32_t tDeserializeSDropDnodeReq(void *buf, int32_t bufLen, SDropDnodeReq *pReq
if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->fqdn) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->port) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->force) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
......
......@@ -20,7 +20,7 @@ static void *dmStatusThreadFp(void *param) {
SDnodeMgmt *pMgmt = param;
int64_t lastTime = taosGetTimestampMs();
setThreadName("dnode-status");
while (1) {
taosMsleep(200);
if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
......@@ -28,6 +28,7 @@ static void *dmStatusThreadFp(void *param) {
int64_t curTime = taosGetTimestampMs();
float interval = (curTime - lastTime) / 1000.0f;
if (interval >= tsStatusInterval) {
taosMemoryTrim(0);
dmSendStatusReq(pMgmt);
lastTime = curTime;
}
......
......@@ -28,7 +28,7 @@ SMnodeObj *mndAcquireMnode(SMnode *pMnode, int32_t mnodeId);
void mndReleaseMnode(SMnode *pMnode, SMnodeObj *pObj);
bool mndIsMnode(SMnode *pMnode, int32_t dnodeId);
void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet);
int32_t mndSetDropMnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj);
int32_t mndSetDropMnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj, bool force);
#ifdef __cplusplus
}
......
......@@ -29,7 +29,7 @@ void mndCleanupQnode(SMnode *pMnode);
SQnodeObj *mndAcquireQnode(SMnode *pMnode, int32_t qnodeId);
void mndReleaseQnode(SMnode *pMnode, SQnodeObj *pObj);
int32_t mndCreateQnodeList(SMnode *pMnode, SArray **pList, int32_t limit);
int32_t mndSetDropQnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SQnodeObj *pObj);
int32_t mndSetDropQnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SQnodeObj *pObj, bool force);
#ifdef __cplusplus
}
......
......@@ -27,7 +27,7 @@ void mndCleanupSnode(SMnode *pMnode);
SSnodeObj *mndAcquireSnode(SMnode *pMnode, int32_t qnodeId);
void mndReleaseSnode(SMnode *pMnode, SSnodeObj *pObj);
SEpSet mndAcquireEpFromSnode(SMnode *pMnode, const SSnodeObj *pSnode);
int32_t mndSetDropSnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SSnodeObj *pObj);
int32_t mndSetDropSnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SSnodeObj *pObj, bool force);
#ifdef __cplusplus
}
......
......@@ -42,8 +42,7 @@ int32_t mndAddCreateVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *p
int32_t mndAddAlterVnodeConfirmAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup);
int32_t mndAddAlterVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, tmsg_t msgType);
int32_t mndAddDropVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid, bool isRedo);
int32_t mndSetMoveVgroupInfoToTrans(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t vn, SArray *pArray);
int32_t mndSetMoveVgroupsInfoToTrans(SMnode *, STrans *pTrans, int32_t dropDnodeId);
int32_t mndSetMoveVgroupsInfoToTrans(SMnode *, STrans *pTrans, int32_t dropDnodeId, bool force);
int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
SArray *pArray);
......
......@@ -670,7 +670,7 @@ _OVER:
}
static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMnodeObj *pMObj, SQnodeObj *pQObj,
SSnodeObj *pSObj, int32_t numOfVnodes) {
SSnodeObj *pSObj, int32_t numOfVnodes, bool force) {
int32_t code = -1;
SSdbRaw *pRaw = NULL;
STrans *pTrans = NULL;
......@@ -678,7 +678,7 @@ static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SM
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "drop-dnode");
if (pTrans == NULL) goto _OVER;
mndTransSetSerial(pTrans);
mInfo("trans:%d, used to drop dnode:%d", pTrans->id, pDnode->id);
mInfo("trans:%d, used to drop dnode:%d, force:%d", pTrans->id, pDnode->id, force);
pRaw = mndDnodeActionEncode(pDnode);
if (pRaw == NULL) goto _OVER;
......@@ -694,22 +694,22 @@ static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SM
if (pMObj != NULL) {
mInfo("trans:%d, mnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
if (mndSetDropMnodeInfoToTrans(pMnode, pTrans, pMObj) != 0) goto _OVER;
if (mndSetDropMnodeInfoToTrans(pMnode, pTrans, pMObj, force) != 0) goto _OVER;
}
if (pQObj != NULL) {
mInfo("trans:%d, qnode on dnode:%d will be dropped", pTrans->id, pDnode->id);
if (mndSetDropQnodeInfoToTrans(pMnode, pTrans, pQObj) != 0) goto _OVER;
if (mndSetDropQnodeInfoToTrans(pMnode, pTrans, pQObj, force) != 0) goto _OVER;
}
if (pSObj != NULL) {
mInfo("trans:%d, snode on dnode:%d will be dropped", pTrans->id, pDnode->id);
if (mndSetDropSnodeInfoToTrans(pMnode, pTrans, pSObj) != 0) goto _OVER;
if (mndSetDropSnodeInfoToTrans(pMnode, pTrans, pSObj, force) != 0) goto _OVER;
}
if (numOfVnodes > 0) {
mInfo("trans:%d, %d vnodes on dnode:%d will be dropped", pTrans->id, numOfVnodes, pDnode->id);
if (mndSetMoveVgroupsInfoToTrans(pMnode, pTrans, pDnode->id) != 0) goto _OVER;
if (mndSetMoveVgroupsInfoToTrans(pMnode, pTrans, pDnode->id, force) != 0) goto _OVER;
}
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
......@@ -767,16 +767,16 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
}
int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
if (numOfVnodes > 0 || pMObj != NULL) {
if ((numOfVnodes > 0 || pMObj != NULL || pSObj != NULL || pQObj != NULL) && !dropReq.force) {
if (!mndIsDnodeOnline(pDnode, taosGetTimestampMs())) {
terrno = TSDB_CODE_NODE_OFFLINE;
mError("dnode:%d, failed to drop since %s, has_mnode:%d numOfVnodes:%d", pDnode->id, terrstr(), pMObj != NULL,
numOfVnodes);
mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, terrstr(),
numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
goto _OVER;
}
}
code = mndDropDnode(pMnode, pReq, pDnode, pMObj, pQObj, pSObj, numOfVnodes);
code = mndDropDnode(pMnode, pReq, pDnode, pMObj, pQObj, pSObj, numOfVnodes, dropReq.force);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
_OVER:
......
......@@ -586,6 +586,11 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
SRpcMsg rsp = {.code = code, .info = pMsg->info};
tmsgSendRsp(&rsp);
} else if (pMsg->msgType == TDMT_SYNC_LOCAL_CMD) {
SyncLocalCmd *pSyncMsg = syncLocalCmdFromRpcMsg2(pMsg);
code = syncNodeOnLocalCmd(pSyncNode, pSyncMsg);
syncLocalCmdDestroy(pSyncMsg);
} else {
mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
code = -1;
......
......@@ -472,7 +472,8 @@ static int32_t mndSetDropMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeO
return 0;
}
static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj,
bool force) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SDDropMnodeReq dropReq = {0};
......@@ -485,12 +486,21 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
int32_t totalMnodes = sdbGetSize(pSdb, SDB_MNODE);
if (totalMnodes == 2) {
if (force) {
mError("cant't force drop dnode, since a mnode on it and replica is 2");
terrno = TSDB_CODE_NODE_OFFLINE;
return -1;
}
mInfo("vgId:1, has %d mnodes, exec redo log first", totalMnodes);
if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) return -1;
if (mndBuildDropMnodeRedoAction(pTrans, &dropReq, &dropEpSet) != 0) return -1;
if (!force) {
if (mndBuildDropMnodeRedoAction(pTrans, &dropReq, &dropEpSet) != 0) return -1;
}
} else if (totalMnodes == 3) {
mInfo("vgId:1, has %d mnodes, exec redo action first", totalMnodes);
if (mndBuildDropMnodeRedoAction(pTrans, &dropReq, &dropEpSet) != 0) return -1;
if (!force) {
if (mndBuildDropMnodeRedoAction(pTrans, &dropReq, &dropEpSet) != 0) return -1;
}
if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) return -1;
} else {
return -1;
......@@ -499,9 +509,9 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
return 0;
}
int32_t mndSetDropMnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
int32_t mndSetDropMnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj, bool force) {
if (pObj == NULL) return 0;
if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) return -1;
if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj, force) != 0) return -1;
if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) return -1;
return 0;
}
......@@ -515,7 +525,7 @@ static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) {
mndTransSetSerial(pTrans);
mInfo("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id);
if (mndSetDropMnodeInfoToTrans(pMnode, pTrans, pObj) != 0) goto _OVER;
if (mndSetDropMnodeInfoToTrans(pMnode, pTrans, pObj, false) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0;
......@@ -743,7 +753,7 @@ static void mndReloadSyncConfig(SMnode *pMnode) {
mInfo("vgId:1, mnode sync not reconfig since readyMnodes:%d updatingMnodes:%d", readyMnodes, updatingMnodes);
return;
}
// ASSERT(0);
// ASSERT(0);
if (cfg.myIndex == -1) {
#if 1
......
......@@ -353,11 +353,13 @@ static int32_t mndSetDropQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQn
return 0;
}
int32_t mndSetDropQnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SQnodeObj *pObj) {
int32_t mndSetDropQnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SQnodeObj *pObj, bool force) {
if (pObj == NULL) return 0;
if (mndSetDropQnodeRedoLogs(pTrans, pObj) != 0) return -1;
if (mndSetDropQnodeCommitLogs(pTrans, pObj) != 0) return -1;
if (mndSetDropQnodeRedoActions(pTrans, pObj->pDnode, pObj) != 0) return -1;
if (!force) {
if (mndSetDropQnodeRedoActions(pTrans, pObj->pDnode, pObj) != 0) return -1;
}
return 0;
}
......@@ -368,7 +370,7 @@ static int32_t mndDropQnode(SMnode *pMnode, SRpcMsg *pReq, SQnodeObj *pObj) {
if (pTrans == NULL) goto _OVER;
mInfo("trans:%d, used to drop qnode:%d", pTrans->id, pObj->id);
if (mndSetDropQnodeInfoToTrans(pMnode, pTrans, pObj) != 0) goto _OVER;
if (mndSetDropQnodeInfoToTrans(pMnode, pTrans, pObj, false) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0;
......
......@@ -360,11 +360,13 @@ static int32_t mndSetDropSnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SSn
return 0;
}
int32_t mndSetDropSnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SSnodeObj *pObj) {
int32_t mndSetDropSnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SSnodeObj *pObj, bool force) {
if (pObj == NULL) return 0;
if (mndSetDropSnodeRedoLogs(pTrans, pObj) != 0) return -1;
if (mndSetDropSnodeCommitLogs(pTrans, pObj) != 0) return -1;
if (mndSetDropSnodeRedoActions(pTrans, pObj->pDnode, pObj) != 0) return -1;
if (!force) {
if (mndSetDropSnodeRedoActions(pTrans, pObj->pDnode, pObj) != 0) return -1;
}
return 0;
}
......@@ -375,7 +377,7 @@ static int32_t mndDropSnode(SMnode *pMnode, SRpcMsg *pReq, SSnodeObj *pObj) {
if (pTrans == NULL) goto _OVER;
mInfo("trans:%d, used to drop snode:%d", pTrans->id, pObj->id);
if (mndSetDropSnodeInfoToTrans(pMnode, pTrans, pObj) != 0) goto _OVER;
if (mndSetDropSnodeInfoToTrans(pMnode, pTrans, pObj, false) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0;
......@@ -386,9 +388,9 @@ _OVER:
}
static int32_t mndProcessDropSnodeReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
int32_t code = -1;
SSnodeObj *pObj = NULL;
SMnode *pMnode = pReq->info.node;
int32_t code = -1;
SSnodeObj *pObj = NULL;
SMDropSnodeReq dropReq = {0};
if (tDeserializeSCreateDropMQSNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
......
......@@ -2029,7 +2029,7 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_VND_DROP_STB;
action.acceptableCode = TSDB_CODE_VND_TB_NOT_EXIST;
action.acceptableCode = TSDB_CODE_TDB_STB_NOT_EXIST;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
sdbCancelFetch(pSdb, pIter);
......
......@@ -1071,7 +1071,7 @@ int32_t mndAddDropVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgOb
}
int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t vnIndex,
SArray *pArray) {
SArray *pArray, bool force) {
SVgObj newVg = {0};
memcpy(&newVg, pVgroup, sizeof(SVgObj));
......@@ -1080,24 +1080,45 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
}
mInfo("vgId:%d, will add 1 vnodes", pVgroup->vgId);
if (mndAddVnodeToVgroup(pMnode, &newVg, pArray) != 0) return -1;
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[newVg.replica - 1]) != 0) return -1;
for (int32_t i = 0; i < newVg.replica - 1; ++i) {
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1;
}
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
if (!force) {
mInfo("vgId:%d, will add 1 vnode", pVgroup->vgId);
if (mndAddVnodeToVgroup(pMnode, &newVg, pArray) != 0) return -1;
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[newVg.replica - 1]) != 0) return -1;
for (int32_t i = 0; i < newVg.replica - 1; ++i) {
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1;
}
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
mInfo("vgId:%d, will remove 1 vnodes", pVgroup->vgId);
newVg.replica--;
SVnodeGid del = newVg.vnodeGid[vnIndex];
newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true) != 0) return -1;
for (int32_t i = 0; i < newVg.replica; ++i) {
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1;
mInfo("vgId:%d, will remove 1 vnode", pVgroup->vgId);
newVg.replica--;
SVnodeGid del = newVg.vnodeGid[vnIndex];
newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true) != 0) return -1;
for (int32_t i = 0; i < newVg.replica; ++i) {
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1;
}
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
} else {
mInfo("vgId:%d, will add 1 vnode and force remove 1 vnode", pVgroup->vgId);
if (mndAddVnodeToVgroup(pMnode, &newVg, pArray) != 0) return -1;
newVg.replica--;
SVnodeGid del = newVg.vnodeGid[vnIndex];
newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[vnIndex]) != 0) return -1;
for (int32_t i = 0; i < newVg.replica; ++i) {
if (i != vnIndex) {
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1;
}
}
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
if (newVg.replica == 1) {
mInfo("vgId:%d, all data is dropped since replica=1", pVgroup->vgId);
}
}
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
{
SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
......@@ -1120,7 +1141,7 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
return 0;
}
int32_t mndSetMoveVgroupsInfoToTrans(SMnode *pMnode, STrans *pTrans, int32_t delDnodeId) {
int32_t mndSetMoveVgroupsInfoToTrans(SMnode *pMnode, STrans *pTrans, int32_t delDnodeId, bool force) {
int32_t code = 0;
SArray *pArray = mndBuildDnodesArray(pMnode, delDnodeId);
if (pArray == NULL) return -1;
......@@ -1141,9 +1162,9 @@ int32_t mndSetMoveVgroupsInfoToTrans(SMnode *pMnode, STrans *pTrans, int32_t del
code = 0;
if (vnIndex != -1) {
mInfo("vgId:%d, vnode:%d will be removed from dnode:%d", pVgroup->vgId, vnIndex, delDnodeId);
mInfo("vgId:%d, vnode:%d will be removed from dnode:%d, force:%d", pVgroup->vgId, vnIndex, delDnodeId, force);
SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
code = mndSetMoveVgroupInfoToTrans(pMnode, pTrans, pDb, pVgroup, vnIndex, pArray);
code = mndSetMoveVgroupInfoToTrans(pMnode, pTrans, pDb, pVgroup, vnIndex, pArray, force);
mndReleaseDb(pMnode, pDb);
}
......
......@@ -1867,7 +1867,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == key) {
init = true;
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -1881,7 +1881,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
tRowMerge(&merge, &fRow1);
} else {
init = true;
int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......
......@@ -284,7 +284,8 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
break;
}
vTrace("vgId:%d, process %s request success, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version);
vTrace("vgId:%d, process %s request, code:0x%x index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), pRsp->code,
version);
walApplyVer(pVnode->pWal, version);
......
......@@ -350,6 +350,11 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg);
syncSnapshotRspDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_LOCAL_CMD) {
SyncLocalCmd *pSyncMsg = syncLocalCmdFromRpcMsg2(pMsg);
code = syncNodeOnLocalCmd(pSyncNode, pSyncMsg);
syncLocalCmdDestroy(pSyncMsg);
} else {
vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg, pMsg->msgType);
code = -1;
......
......@@ -23,6 +23,7 @@
#include "tpagedbuf.h"
#include "tsimplehash.h"
#include "vnode.h"
#include "executor.h"
#define T_LONG_JMP(_obj, _c) \
do { \
......@@ -95,27 +96,24 @@ typedef struct SColMatchInfo {
int32_t matchType; // determinate the source according to col id or slot id
} SColMatchInfo;
// If the numOfOutputGroups is 1, the data blocks that belongs to different groups will be provided randomly
// The numOfOutputGroups is specified by physical plan. and will not be affect by numOfGroups
typedef struct STableListInfo {
bool oneTableForEachGroup;
int32_t numOfOuputGroups; // the data block will be generated one by one
int32_t* groupOffset; // keep the offset value for each group in the tableList
SArray* pTableList;
SHashObj* map; // speedup acquire the tableQueryInfo by table uid
uint64_t suid;
} STableListInfo;
void destroyTableList(STableListInfo* pTableList);
int32_t getNumOfOutputGroups(const STableListInfo* pTableList);
bool oneTableForEachGroup(const STableListInfo* pTableList);
uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid);
int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t gid);
int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalIndex, STableKeyInfo** pKeyInfo, int32_t* num);
uint64_t getTotalTables(const STableListInfo* pTableList);
typedef struct STableListInfo STableListInfo;
struct SqlFunctionCtx;
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, const char* id);
STableListInfo* tableListCreate();
void* tableListDestroy(STableListInfo* pTableListInfo);
void tableListClear(STableListInfo* pTableListInfo);
int32_t tableListGetOutputGroups(const STableListInfo* pTableList);
bool oneTableForEachGroup(const STableListInfo* pTableList);
uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid);
int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t gid);
int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalIndex, STableKeyInfo** pKeyInfo, int32_t* num);
uint64_t tableListGetSize(const STableListInfo* pTableList);
uint64_t tableListGetSuid(const STableListInfo* pTableList);
STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index);
size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput);
void initResultRowInfo(SResultRowInfo* pResultRowInfo);
void closeResultRow(SResultRow* pResultRow);
......@@ -128,6 +126,7 @@ static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRo
if (forUpdate) {
setBufPageDirty(bufPage, true);
}
SResultRow* pRow = (SResultRow*)((char*)bufPage + pos->offset);
return pRow;
}
......@@ -143,10 +142,7 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode);
EDealRes doTranslateTagExpr(SNode** pNode, void* pContext);
int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
STableListInfo* pListInfo);
int32_t getGroupIdFromTagsVal(void* pMeta, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId);
int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableListInfo* pTableListInfo);
size_t getTableTagsBufLen(const SNodeList* pGroups);
SArray* createSortInfo(SNodeList* pNodeList);
......@@ -169,9 +165,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
void cleanupQueryTableDataCond(SQueryTableDataCond* pCond);
int32_t convertFillType(int32_t mode);
int32_t resultrowComparAsc(const void* p1, const void* p2);
int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, bool* pQualified);
#endif // TDENGINE_QUERYUTIL_H
......@@ -184,7 +184,7 @@ typedef struct SExecTaskInfo {
int64_t version; // used for stream to record wal version
SStreamTaskInfo streamInfo;
SSchemaInfo schemaInfo;
STableListInfo tableqinfoList; // this is a table list
STableListInfo* pTableInfoList; // this is a table list
const char* sql; // query sql string
jmp_buf env; // jump to this position when error happens.
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
......@@ -555,7 +555,7 @@ typedef struct SSysTableScanInfo {
typedef struct SBlockDistInfo {
SSDataBlock* pResBlock;
void* pHandle;
STsdbReader* pHandle;
SReadHandle readHandle;
uint64_t uid; // table uid
} SBlockDistInfo;
......@@ -970,8 +970,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid,
SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
SExecTaskInfo* pTaskInfo);
......@@ -1065,10 +1065,6 @@ uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, S
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
const char* idstr);
SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo* pTableListInfo,
......@@ -1077,7 +1073,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex);
bool groupbyTbname(SNodeList* pGroupList);
int32_t setGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group, bool groupSort);
void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput);
int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
SGroupResInfo* pGroupResInfo);
......
......@@ -59,22 +59,23 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
goto _error;
}
STableListInfo* pTableList = &pTaskInfo->tableqinfoList;
STableListInfo* pTableList = pTaskInfo->pTableInfoList;
initResultSizeInfo(&pOperator->resultInfo, 4096);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
pInfo->pUidList = taosArrayInit(4, sizeof(int64_t));
// partition by tbname, todo opt perf
if (oneTableForEachGroup(pTableList) || (getTotalTables(pTableList) == 1)) {
if (oneTableForEachGroup(pTableList) || (tableListGetSize(pTableList) == 1)) {
pInfo->retrieveType =
CACHESCAN_RETRIEVE_TYPE_ALL | (pScanNode->ignoreNull ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW);
STableKeyInfo* pList = taosArrayGet(pTableList->pTableList, 0);
size_t num = taosArrayGetSize(pTableList->pTableList);
STableKeyInfo* pList = tableListGetInfo(pTableList, 0);
size_t num = tableListGetSize(pTableList);
uint64_t suid = tableListGetSuid(pTableList);
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
taosArrayGetSize(pInfo->matchInfo.pList), pTableList->suid, &pInfo->pLastrowReader);
taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
......@@ -120,8 +121,10 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
SLastrowScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableListInfo* pTableList = &pTaskInfo->tableqinfoList;
int32_t size = taosArrayGetSize(pTableList->pTableList);
STableListInfo* pTableList = pTaskInfo->pTableInfoList;
uint64_t suid = tableListGetSuid(pTableList);
int32_t size = tableListGetSize(pTableList);
if (size == 0) {
doSetOperatorCompleted(pOperator);
return NULL;
......@@ -184,20 +187,19 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
return NULL;
}
} else {
size_t totalGroups = getNumOfOutputGroups(pTableList);
size_t totalGroups = tableListGetOutputGroups(pTableList);
while (pInfo->currentGroupIndex < totalGroups) {
STableKeyInfo* pList = NULL;
int32_t num = 0;
int32_t code = getTablesOfGroup(pTableList, pInfo->currentGroupIndex, &pList, &num);
int32_t code = tableListGetGroupList(pTableList, pInfo->currentGroupIndex, &pList, &num);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
taosArrayGetSize(pInfo->matchInfo.pList), pTableList->suid, &pInfo->pLastrowReader);
taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader);
taosArrayClear(pInfo->pUidList);
code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList);
......
......@@ -26,9 +26,31 @@
#include "executorimpl.h"
#include "tcompression.h"
static int32_t removeInvalidTable(SArray* list, SHashObj* tags);
// If the numOfOutputGroups is 1, the data blocks that belongs to different groups will be provided randomly
// The numOfOutputGroups is specified by physical plan. and will not be affect by numOfGroups
struct STableListInfo {
bool oneTableForEachGroup;
int32_t numOfOuputGroups; // the data block will be generated one by one
int32_t* groupOffset; // keep the offset value for each group in the tableList
SArray* pTableList;
SHashObj* map; // speedup acquire the tableQueryInfo by table uid
uint64_t suid;
};
typedef struct tagFilterAssist {
SHashObj* colHash;
int32_t index;
SArray* cInfoList;
} tagFilterAssist;
static int32_t removeInvalidTable(SArray* uids, SHashObj* tags);
static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond, SHashObj* tags);
static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond);
static int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond,
SNode* pTagIndexCond, STableListInfo* pListInfo);
static int64_t getLimit(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->limit; }
static int64_t getOffset(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->offset; }
void initResultRowInfo(SResultRowInfo* pResultRowInfo) {
pResultRowInfo->size = 0;
......@@ -301,12 +323,6 @@ int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle,
return TSDB_CODE_SUCCESS;
}
typedef struct tagFilterAssist {
SHashObj* colHash;
int32_t index;
SArray* cInfoList;
} tagFilterAssist;
static EDealRes getColumn(SNode** pNode, void* pContext) {
SColumnNode* pSColumnNode = NULL;
if (QUERY_NODE_COLUMN == nodeType((*pNode))) {
......@@ -482,6 +498,7 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, int64_t suid, SArray*
}
}
}
pResBlock->info.rows = rows;
// int64_t st1 = taosGetTimestampUs();
......@@ -766,6 +783,7 @@ static int tableUidCompare(const void* a, const void* b) {
}
return u1 < u2 ? -1 : 1;
}
static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list, SNode* cond, SHashObj* tags) {
int32_t ret = -1;
if (nodeType(cond) == QUERY_NODE_OPERATOR) {
......@@ -820,6 +838,7 @@ static int32_t removeInvalidTable(SArray* uids, SHashObj* tags) {
taosArrayPush(validUid, uid);
}
}
taosArraySwap(uids, validUid);
taosArrayDestroy(validUid);
return 0;
......@@ -930,11 +949,6 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
STableListInfo* pListInfo) {
int32_t code = TSDB_CODE_SUCCESS;
pListInfo->pTableList = taosArrayInit(8, sizeof(STableKeyInfo));
if (pListInfo->pTableList == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
uint64_t tableUid = pScanNode->uid;
pListInfo->suid = pScanNode->suid;
SArray* res = taosArrayInit(8, sizeof(uint64_t));
......@@ -994,13 +1008,14 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
size_t numOfTables = taosArrayGetSize(res);
for (int i = 0; i < numOfTables; i++) {
STableKeyInfo info = {.uid = *(uint64_t*)taosArrayGet(res, i), .groupId = 0};
void* p = taosArrayPush(pListInfo->pTableList, &info);
void* p = taosArrayPush(pListInfo->pTableList, &info);
if (p == NULL) {
taosArrayDestroy(res);
return TSDB_CODE_OUT_OF_MEMORY;
}
qDebug("tagfilter get uid:%" PRId64 "", info.uid);
qDebug("tagfilter get uid:%" PRIu64 "", info.uid);
}
taosArrayDestroy(res);
......@@ -1642,9 +1657,6 @@ bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo) {
pLimitInfo->slimit.offset != -1);
}
static int64_t getLimit(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->limit; }
static int64_t getOffset(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->offset; }
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo) {
SLimit limit = {.limit = getLimit(pLimit), .offset = getOffset(pLimit)};
SLimit slimit = {.limit = getLimit(pSLimit), .offset = getOffset(pSLimit)};
......@@ -1655,11 +1667,23 @@ void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimit
pLimitInfo->remainGroupOffset = slimit.offset;
}
uint64_t getTotalTables(const STableListInfo* pTableList) {
uint64_t tableListGetSize(const STableListInfo* pTableList) {
ASSERT(taosArrayGetSize(pTableList->pTableList) == taosHashGetSize(pTableList->map));
return taosArrayGetSize(pTableList->pTableList);
}
uint64_t tableListGetSuid(const STableListInfo* pTableList) {
return pTableList->suid;
}
STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index) {
if (taosArrayGetSize(pTableList->pTableList) == 0) {
return NULL;
}
return taosArrayGet(pTableList->pTableList, index);
}
uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) {
int32_t* slot = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid));
ASSERT(pTableList->map != NULL && slot != NULL);
......@@ -1670,7 +1694,8 @@ uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) {
return pKeyInfo->groupId;
}
int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t gid) {
// TODO handle the group offset info, fix it, the rule of group output will be broken by this function
int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t gid) {
if (pTableList->map == NULL) {
ASSERT(taosArrayGetSize(pTableList->pTableList) == 0);
pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
......@@ -1686,9 +1711,9 @@ int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t
return TSDB_CODE_SUCCESS;
}
int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo,
int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo,
int32_t* size) {
int32_t total = getNumOfOutputGroups(pTableList);
int32_t total = tableListGetOutputGroups(pTableList);
if (ordinalGroupIndex < 0 || ordinalGroupIndex >= total) {
return TSDB_CODE_INVALID_PARA;
}
......@@ -1696,10 +1721,10 @@ int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalGroupI
// here handle two special cases:
// 1. only one group exists, and 2. one table exists for each group.
if (total == 1) {
*size = getTotalTables(pTableList);
*size = tableListGetSize(pTableList);
*pKeyInfo = (*size == 0)? NULL:taosArrayGet(pTableList->pTableList, 0);
return TSDB_CODE_SUCCESS;
} else if (total == getTotalTables(pTableList)) {
} else if (total == tableListGetSize(pTableList)) {
*size = 1;
*pKeyInfo = taosArrayGet(pTableList->pTableList, ordinalGroupIndex);
return TSDB_CODE_SUCCESS;
......@@ -1716,16 +1741,178 @@ int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalGroupI
return TSDB_CODE_SUCCESS;
}
int32_t getNumOfOutputGroups(const STableListInfo* pTableList) { return pTableList->numOfOuputGroups; }
int32_t tableListGetOutputGroups(const STableListInfo* pTableList) { return pTableList->numOfOuputGroups; }
bool oneTableForEachGroup(const STableListInfo* pTableList) { return pTableList->oneTableForEachGroup; }
void destroyTableList(STableListInfo* pTableqinfoList) {
pTableqinfoList->pTableList = taosArrayDestroy(pTableqinfoList->pTableList);
taosMemoryFreeClear(pTableqinfoList->groupOffset);
STableListInfo* tableListCreate() {
STableListInfo* pListInfo = taosMemoryCalloc(1, sizeof(STableListInfo));
if (pListInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));
if (pListInfo->pTableList == NULL) {
goto _error;
}
pListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (pListInfo->map == NULL) {
goto _error;
}
pListInfo->numOfOuputGroups = 1;
return pListInfo;
_error:
tableListDestroy(pListInfo);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
void* tableListDestroy(STableListInfo* pTableListInfo) {
if (pTableListInfo == NULL) {
return NULL;
}
pTableListInfo->pTableList = taosArrayDestroy(pTableListInfo->pTableList);
taosMemoryFreeClear(pTableListInfo->groupOffset);
taosHashCleanup(pTableListInfo->map);
pTableListInfo->pTableList = NULL;
pTableListInfo->map = NULL;
taosMemoryFree(pTableListInfo);
return NULL;
}
void tableListClear(STableListInfo* pTableListInfo) {
if (pTableListInfo == NULL) {
return;
}
taosArrayClear(pTableListInfo->pTableList);
taosHashClear(pTableListInfo->map);
taosMemoryFree(pTableListInfo->groupOffset);
pTableListInfo->numOfOuputGroups = 1;
pTableListInfo->oneTableForEachGroup = false;
}
static int32_t orderbyGroupIdComparFn(const void* p1, const void* p2) {
STableKeyInfo* pInfo1 = (STableKeyInfo*) p1;
STableKeyInfo* pInfo2 = (STableKeyInfo*) p2;
if (pInfo1->groupId == pInfo2->groupId) {
return 0;
} else {
return pInfo1->groupId < pInfo2->groupId? -1:1;
}
}
static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
int32_t code = TSDB_CODE_SUCCESS;
taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
int32_t size = taosArrayGetSize(pTableListInfo->pTableList);
SArray* pList = taosArrayInit(4, sizeof(int32_t));
STableKeyInfo* pInfo = taosArrayGet(pTableListInfo->pTableList, 0);
uint64_t gid = pInfo->groupId;
int32_t start = 0;
taosArrayPush(pList, &start);
for(int32_t i = 1; i < size; ++i) {
pInfo = taosArrayGet(pTableListInfo->pTableList, i);
if (pInfo->groupId != gid) {
taosArrayPush(pList, &i);
gid = pInfo->groupId;
}
}
pTableListInfo->numOfOuputGroups = taosArrayGetSize(pList);
pTableListInfo->groupOffset = taosMemoryMalloc(sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
memcpy(pTableListInfo->groupOffset, taosArrayGet(pList, 0), sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
taosArrayDestroy(pList);
return TDB_CODE_SUCCESS;
}
int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group, bool groupSort) {
int32_t code = TSDB_CODE_SUCCESS;
ASSERT(pTableListInfo->map != NULL);
bool groupByTbname = groupbyTbname(group);
size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
if (group == NULL || groupByTbname) {
for (int32_t i = 0; i < numOfTables; i++) {
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
info->groupId = groupByTbname? info->uid:0;
}
pTableListInfo->oneTableForEachGroup = groupByTbname;
taosHashCleanup(pTableqinfoList->map);
if (groupSort && groupByTbname) {
taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
pTableListInfo->numOfOuputGroups = numOfTables;
} else {
pTableListInfo->numOfOuputGroups = 1;
}
} else {
code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pTableqinfoList->pTableList = NULL;
pTableqinfoList->map = NULL;
}
\ No newline at end of file
if (groupSort) {
code = sortTableGroup(pTableListInfo);
}
}
// add all table entry in the hash map
size_t size = taosArrayGetSize(pTableListInfo->pTableList);
for(int32_t i = 0; i < size; ++i) {
STableKeyInfo* p = taosArrayGet(pTableListInfo->pTableList, i);
taosHashPut(pTableListInfo->map, &p->uid, sizeof(uint64_t), &i, sizeof(int32_t));
}
return code;
}
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
const char* idStr) {
int64_t st = taosGetTimestampUs();
if (pHandle == NULL) {
qError("invalid handle, in creating operator tree, %s", idStr);
return TSDB_CODE_INVALID_PARA;
}
int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to getTableList, code: %s", tstrerror(code));
return code;
}
ASSERT(pTableListInfo->numOfOuputGroups == 1);
int64_t st1 = taosGetTimestampUs();
qDebug("generate queried table list completed, elapsed time:%.2f ms %s", (st1 - st) / 1000.0, idStr);
if (taosArrayGetSize(pTableListInfo->pTableList) == 0) {
qDebug("no table qualified for query, %s" PRIx64, idStr);
return TSDB_CODE_SUCCESS;
}
code = buildGroupIdMapForAllTables(pTableListInfo, pHandle, pGroupTags, groupSort);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
int64_t st2 = taosGetTimestampUs();
qDebug("generate group id map completed, elapsed time:%.2f ms %s", (st2 - st1) / 1000.0, idStr);
return TSDB_CODE_SUCCESS;
}
......@@ -287,14 +287,11 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S
int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
STableListInfo* pListInfo = &pTaskInfo->tableqinfoList;
if (isAdd) {
qDebug("add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), pTaskInfo->id.str);
}
// traverse to the stream scanner node to add this table id
SOperatorInfo* pInfo = pTaskInfo->pRoot;
while (pInfo->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
......@@ -328,7 +325,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
}
}
STableListInfo* pTableListInfo = &pTaskInfo->tableqinfoList;
STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
for (int32_t i = 0; i < numOfQualifiedTables; ++i) {
uint64_t* uid = taosArrayGet(qa, i);
......@@ -361,7 +358,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
if (!exists) {
#endif
addTableIntoTableList(pTableListInfo, keyInfo.uid, keyInfo.groupId);
tableListAddTableInfo(pTableListInfo, keyInfo.uid, keyInfo.groupId);
}
if (keyBuf != NULL) {
......@@ -925,8 +922,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
int64_t ts = pOffset->ts;
if (uid == 0) {
if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) {
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0);
if (tableListGetSize(pTaskInfo->pTableInfoList) != 0) {
STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, 0);
uid = pTableInfo->uid;
ts = INT64_MIN;
} else {
......@@ -937,7 +934,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
/*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/
/*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
int32_t numOfTables = getTotalTables(&pTaskInfo->tableqinfoList);
int32_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList);
#ifndef NDEBUG
qDebug("switch to next table %" PRId64 " (cursor %d), %" PRId64 " rows returned", uid,
......@@ -947,7 +944,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
bool found = false;
for (int32_t i = 0; i < numOfTables; i++) {
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i);
STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, i);
if (pTableInfo->uid == uid) {
found = true;
pTableScanInfo->currentTable = i;
......@@ -959,8 +956,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
ASSERT(found);
if (pTableScanInfo->dataReader == NULL) {
STableKeyInfo* pList = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0);
int32_t num = getTotalTables(&pTaskInfo->tableqinfoList);
STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0);
int32_t num = tableListGetSize(pTaskInfo->pTableInfoList);
if (tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &pTableScanInfo->cond, pList, num,
&pTableScanInfo->dataReader, NULL) < 0 || pTableScanInfo->dataReader == NULL) {
......@@ -993,22 +990,28 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
SMetaTableInfo mtInfo = getUidfromSnapShot(sContext);
tsdbReaderClose(pInfo->dataReader);
pInfo->dataReader = NULL;
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
taosArrayDestroy(pTaskInfo->tableqinfoList.pTableList);
if (mtInfo.uid == 0) return 0; // no data
tableListClear(pTaskInfo->pTableInfoList);
if (mtInfo.uid == 0) {
return 0; // no data
}
initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo);
pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts;
STableListInfo* pListInfo = &pTaskInfo->tableqinfoList;
if (pTaskInfo->pTableInfoList == NULL) {
pTaskInfo->pTableInfoList = tableListCreate();
}
pListInfo->pTableList = taosArrayInit(1, sizeof(STableKeyInfo));
taosArrayPush(pListInfo->pTableList, &(STableKeyInfo){.uid = mtInfo.uid, .groupId = 0});
tableListAddTableInfo(pTaskInfo->pTableInfoList, mtInfo.uid, 0);
STableKeyInfo* pList = taosArrayGet(pListInfo->pTableList, 0);
STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0);
int32_t size = tableListGetSize(pTaskInfo->pTableInfoList);
ASSERT(size == 1);
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, taosArrayGetSize(pListInfo->pTableList),
&pInfo->dataReader, NULL);
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, &pInfo->dataReader, NULL);
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName);
......
......@@ -3050,7 +3050,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN
}
taosMemoryFreeClear(pOperator);
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
pTaskInfo->code = code;
return NULL;
}
......@@ -3273,6 +3273,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT
pTaskInfo->cost.created = taosGetTimestampMs();
pTaskInfo->id.queryId = queryId;
pTaskInfo->execModel = model;
pTaskInfo->pTableInfoList = tableListCreate();
char* p = taosMemoryCalloc(1, 128);
snprintf(p, 128, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId);
......@@ -3364,117 +3365,6 @@ static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) {
static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSSchemaWrapper(pStreamInfo->schema); }
static int32_t orderbyGroupIdComparFn(const void* p1, const void* p2) {
STableKeyInfo* pInfo1 = (STableKeyInfo*) p1;
STableKeyInfo* pInfo2 = (STableKeyInfo*) p2;
if (pInfo1->groupId == pInfo2->groupId) {
return 0;
} else {
return pInfo1->groupId < pInfo2->groupId? -1:1;
}
}
static int32_t sortTableGroup(STableListInfo* pTableListInfo) {
int32_t code = TSDB_CODE_SUCCESS;
taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
int32_t size = taosArrayGetSize(pTableListInfo->pTableList);
SArray* pList = taosArrayInit(4, sizeof(int32_t));
STableKeyInfo* pInfo = taosArrayGet(pTableListInfo->pTableList, 0);
uint64_t gid = pInfo->groupId;
int32_t start = 0;
taosArrayPush(pList, &start);
for(int32_t i = 1; i < size; ++i) {
pInfo = taosArrayGet(pTableListInfo->pTableList, i);
if (pInfo->groupId != gid) {
taosArrayPush(pList, &i);
gid = pInfo->groupId;
}
}
pTableListInfo->numOfOuputGroups = taosArrayGetSize(pList);
pTableListInfo->groupOffset = taosMemoryMalloc(sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
memcpy(pTableListInfo->groupOffset, taosArrayGet(pList, 0), sizeof(int32_t) * pTableListInfo->numOfOuputGroups);
taosArrayDestroy(pList);
# if 0
SArray* sortSupport = taosArrayInit(16, sizeof(uint64_t));
if (sortSupport == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
size_t num = taosArrayGetSize(pTableListInfo->pTableList);
for (int32_t i = 0; i < num; i++) {
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
uint64_t* groupId = taosHashGet(pTableListInfo->map, &info->uid, sizeof(uint64_t));
int32_t index = taosArraySearchIdx(sortSupport, groupId, compareUint64Val, TD_EQ);
if (index == -1) {
void* p = taosArraySearch(sortSupport, groupId, compareUint64Val, TD_GT);
SArray* tGroup = taosArrayInit(8, sizeof(STableKeyInfo));
if (tGroup == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
if (taosArrayPush(tGroup, info) == NULL) {
qError("taos push info array error");
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
if (p == NULL) {
if (taosArrayPush(sortSupport, groupId) == NULL) {
qError("taos push support array error");
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
if (taosArrayPush(pTableListInfo->pGroupList, &tGroup) == NULL) {
qError("taos push group array error");
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
} else {
int32_t pos = TARRAY_ELEM_IDX(sortSupport, p);
if (taosArrayInsert(sortSupport, pos, groupId) == NULL) {
qError("taos insert support array error");
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
if (taosArrayInsert(pTableListInfo->pGroupList, pos, &tGroup) == NULL) {
qError("taos insert group array error");
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
}
} else {
SArray* tGroup = (SArray*)taosArrayGetP(pTableListInfo->pGroupList, index);
if (taosArrayPush(tGroup, info) == NULL) {
qError("taos push uid array error");
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
}
}
taosArrayDestroy(sortSupport);
#endif
return TDB_CODE_SUCCESS;
_error:
// taosArrayDestroy(sortSupport);
return code;
}
bool groupbyTbname(SNodeList* pGroupList) {
bool bytbname = false;
if (LIST_LENGTH(pGroupList) == 1) {
......@@ -3488,80 +3378,11 @@ bool groupbyTbname(SNodeList* pGroupList) {
return bytbname;
}
int32_t setGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group, bool groupSort) {
int32_t code = TSDB_CODE_SUCCESS;
pTableListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (pTableListInfo->map == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
bool groupByTbname = groupbyTbname(group);
size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
if (group == NULL || groupByTbname) {
for (int32_t i = 0; i < numOfTables; i++) {
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
info->groupId = groupByTbname? info->uid:0;
}
pTableListInfo->oneTableForEachGroup = groupByTbname;
if (groupSort && groupByTbname) {
taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
pTableListInfo->numOfOuputGroups = numOfTables;
} else {
pTableListInfo->numOfOuputGroups = 1;
}
} else {
code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (groupSort) {
code = sortTableGroup(pTableListInfo);
}
}
// add all table entry in the hash map
size_t size = taosArrayGetSize(pTableListInfo->pTableList);
for(int32_t i = 0; i < size; ++i) {
STableKeyInfo* p = taosArrayGet(pTableListInfo->pTableList, i);
taosHashPut(pTableListInfo->map, &p->uid, sizeof(uint64_t), &i, sizeof(int32_t));
}
return code;
}
static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pCond) {
memset(pCond, 0, sizeof(SQueryTableDataCond));
pCond->order = TSDB_ORDER_ASC;
pCond->numOfCols = 1;
pCond->colList = taosMemoryCalloc(1, sizeof(SColumnInfo));
if (pCond->colList == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return terrno;
}
pCond->colList->colId = 1;
pCond->colList->type = TSDB_DATA_TYPE_TIMESTAMP;
pCond->colList->bytes = sizeof(TSKEY);
pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
pCond->suid = uid;
pCond->type = TIMEWINDOW_RANGE_CONTAINED;
pCond->startVersion = -1;
pCond->endVersion = -1;
return TSDB_CODE_SUCCESS;
}
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
const char* pUser) {
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond,
SNode* pTagIndexCond, const char* pUser) {
int32_t type = nodeType(pPhyNode);
STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
const char* idstr = GET_TASKID(pTaskInfo);
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
SOperatorInfo* pOperator = NULL;
......@@ -3576,10 +3397,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
int32_t code =
createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle,
pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
pTableListInfo, pTagCond, pTagIndexCond, idstr);
if (code) {
pTaskInfo->code = code;
qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), GET_TASKID(pTaskInfo));
qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), idstr);
return NULL;
}
......@@ -3596,7 +3417,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
int32_t code =
createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, /*pTableScanNode->groupSort*/true, pHandle,
pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
pTableListInfo, pTagCond, pTagIndexCond, idstr);
if (code) {
pTaskInfo->code = code;
qError("failed to createScanTableListInfo, code: %s", tstrerror(code));
......@@ -3621,7 +3442,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
if (pHandle->vnode) {
int32_t code =
createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort,
pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
pHandle, pTableListInfo, pTagCond, pTagIndexCond, idstr);
if (code) {
pTaskInfo->code = code;
qError("failed to createScanTableListInfo, code: %s", tstrerror(code));
......@@ -3629,11 +3450,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
}
#ifndef NDEBUG
int32_t sz = taosArrayGetSize(pTableListInfo->pTableList);
int32_t sz = tableListGetSize(pTableListInfo);
qDebug("create stream task, total:%d", sz);
for (int32_t i = 0; i < sz; i++) {
STableKeyInfo* pKeyInfo = taosArrayGet(pTableListInfo->pTableList, i);
STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i);
qDebug("add table uid:%" PRIu64", gid:%"PRIu64, pKeyInfo->uid, pKeyInfo->groupId);
}
#endif
......@@ -3646,7 +3467,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOperator = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanPhyNode, pTagCond, pTagIndexCond, pTableListInfo);
int32_t code = createScanTableListInfo(pScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond,
pTagIndexCond, idstr);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
qError("failed to getTableList, code: %s", tstrerror(code));
......@@ -3656,8 +3479,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));
pTableListInfo->numOfOuputGroups = 1;
if (pBlockNode->tableType == TSDB_SUPER_TABLE) {
SArray* pList = taosArrayInit(4, sizeof(STableKeyInfo));
......@@ -3667,38 +3488,21 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL;
}
for(int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); ++i) {
for(int32_t i = 0; i < tableListGetSize(pTableListInfo); ++i) {
STableKeyInfo* p = taosArrayGet(pList, i);
addTableIntoTableList(pTableListInfo, p->uid, 0);
tableListAddTableInfo(pTableListInfo, p->uid, 0);
}
taosArrayDestroy(pList);
} else { // Create one table group.
addTableIntoTableList(pTableListInfo, pBlockNode->uid, 0);
} else { // Create group with only one table
tableListAddTableInfo(pTableListInfo, pBlockNode->uid, 0);
}
SQueryTableDataCond cond = {0};
int32_t code = initTableblockDistQueryCond(pBlockNode->suid, &cond);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
}
size_t num = getTotalTables(pTableListInfo);
void* pList = NULL;
if (num > 0) {
pList = taosArrayGet(pTableListInfo->pTableList, 0);
}
STsdbReader* pReader = NULL;
tsdbReaderOpen(pHandle->vnode, &cond, pList, num, &pReader, "");
cleanupQueryTableDataCond(&cond);
pOperator = createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pBlockNode, pTaskInfo);
pOperator = createDataBlockInfoScanOperator(pHandle, pBlockNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;
int32_t code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo,
pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
pTagCond, pTagIndexCond, idstr);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
return NULL;
......@@ -3725,17 +3529,18 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
}
size_t size = LIST_LENGTH(pPhyNode->pChildren);
SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
if (ops == NULL) {
return NULL;
}
for (int32_t i = 0; i < size; ++i) {
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pUser);
ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, pTagCond, pTagIndexCond, pUser);
if (ops[i] == NULL) {
taosMemoryFree(ops);
return NULL;
}
ops[i]->resultDataBlockId = pChildNode->pOutputDataBlockDesc->dataBlockId;
}
SOperatorInfo* pOptr = NULL;
......@@ -4000,15 +3805,18 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pT
if (NULL == pDeleterParam) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t tbNum = taosArrayGetSize(pTask->tableqinfoList.pTableList);
pDeleterParam->suid = pTask->tableqinfoList.suid;
int32_t tbNum = tableListGetSize(pTask->pTableInfoList);
pDeleterParam->suid = tableListGetSuid(pTask->pTableInfoList);
// TODO extract uid list
pDeleterParam->pUidList = taosArrayInit(tbNum, sizeof(uint64_t));
if (NULL == pDeleterParam->pUidList) {
taosMemoryFree(pDeleterParam);
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < tbNum; ++i) {
STableKeyInfo* pTable = taosArrayGet(pTask->tableqinfoList.pTableList, i);
STableKeyInfo* pTable = tableListGetInfo(pTask->pTableInfoList, i);
taosArrayPush(pDeleterParam->pUidList, &pTable->uid);
}
......@@ -4044,8 +3852,8 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
sql = NULL;
(*pTaskInfo)->pSubplan = pPlan;
(*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, &(*pTaskInfo)->tableqinfoList,
pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user);
(*pTaskInfo)->pRoot =
createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user);
if (NULL == (*pTaskInfo)->pRoot) {
code = (*pTaskInfo)->code;
......@@ -4064,7 +3872,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
void doDestroyTask(SExecTaskInfo* pTaskInfo) {
qDebug("%s execTask is freed", GET_TASKID(pTaskInfo));
destroyTableList(&pTaskInfo->tableqinfoList);
pTaskInfo->pTableInfoList = tableListDestroy(pTaskInfo->pTableInfoList);
destroyOperatorInfo(pTaskInfo->pRoot);
cleanupTableSchemaInfo(&pTaskInfo->schemaInfo);
cleanupStreamInfo(&pTaskInfo->streamInfo);
......
......@@ -623,7 +623,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
pBlock->info = binfo;
ASSERT(binfo.uid != 0);
pBlock->info.groupId = getTableGroupId(&pTaskInfo->tableqinfoList, pBlock->info.uid);
pBlock->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.uid);
uint32_t status = 0;
int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status);
......@@ -719,7 +719,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
// scan table one by one sequentially
if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
int32_t numOfTables = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
int32_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList);
while (1) {
SSDataBlock* result = doTableScanGroup(pOperator);
......@@ -733,7 +733,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
return NULL;
}
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, pInfo->currentTable);
STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->currentTable);
tsdbSetTableList(pInfo->dataReader, pTableInfo, 1);
qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d %s", pTableInfo->uid, numOfTables,
pInfo->currentTable, pTaskInfo->id.str);
......@@ -743,14 +743,14 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
}
} else { // scan table group by group sequentially
if (pInfo->currentGroupId == -1) {
if ((++pInfo->currentGroupId) >= getNumOfOutputGroups(&pTaskInfo->tableqinfoList)) {
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pTaskInfo->pTableInfoList)) {
doSetOperatorCompleted(pOperator);
return NULL;
}
int32_t num = 0;
STableKeyInfo* pList = NULL;
getTablesOfGroup(&pTaskInfo->tableqinfoList, pInfo->currentGroupId, &pList, &num);
tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num);
ASSERT(pInfo->dataReader == NULL);
int32_t code = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, pList, num, (STsdbReader**)&pInfo->dataReader,
......@@ -766,7 +766,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
return result;
}
if ((++pInfo->currentGroupId) >= getNumOfOutputGroups(&pTaskInfo->tableqinfoList)) {
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pTaskInfo->pTableInfoList)) {
doSetOperatorCompleted(pOperator);
return NULL;
}
......@@ -778,7 +778,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
int32_t num = 0;
STableKeyInfo* pList = NULL;
getTablesOfGroup(&pTaskInfo->tableqinfoList, pInfo->currentGroupId, &pList, &num);
tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num);
tsdbSetTableList(pInfo->dataReader, pList, num);
tsdbReaderReset(pInfo->dataReader, &pInfo->cond);
......@@ -1000,8 +1000,31 @@ static void destroyBlockDistScanOperatorInfo(void* param) {
taosMemoryFreeClear(param);
}
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid,
SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo) {
static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pCond) {
memset(pCond, 0, sizeof(SQueryTableDataCond));
pCond->order = TSDB_ORDER_ASC;
pCond->numOfCols = 1;
pCond->colList = taosMemoryCalloc(1, sizeof(SColumnInfo));
if (pCond->colList == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return terrno;
}
pCond->colList->colId = 1;
pCond->colList->type = TSDB_DATA_TYPE_TIMESTAMP;
pCond->colList->bytes = sizeof(TSKEY);
pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
pCond->suid = uid;
pCond->type = TIMEWINDOW_RANGE_CONTAINED;
pCond->startVersion = -1;
pCond->endVersion = -1;
return TSDB_CODE_SUCCESS;
}
SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo) {
SBlockDistInfo* pInfo = taosMemoryCalloc(1, sizeof(SBlockDistInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -1009,9 +1032,24 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* re
goto _error;
}
pInfo->pHandle = dataReader;
{
SQueryTableDataCond cond = {0};
int32_t code = initTableblockDistQueryCond(pBlockScanNode->suid, &cond);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
}
STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
size_t num = tableListGetSize(pTableListInfo);
void* pList = tableListGetInfo(pTableListInfo, 0);
tsdbReaderOpen(readHandle->vnode, &cond, pList, num, &pInfo->pHandle, pTaskInfo->id.str);
cleanupQueryTableDataCond(&cond);
}
pInfo->readHandle = *readHandle;
pInfo->uid = uid;
pInfo->uid = pBlockScanNode->suid;
pInfo->pResBlock = createResDataBlock(pBlockScanNode->node.pOutputDataBlockDesc);
int32_t numOfCols = 0;
......@@ -1119,7 +1157,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true);
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo);
pBlock->info.groupId = getTableGroupId(&pTaskInfo->tableqinfoList, binfo.uid);
pBlock->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, binfo.uid);
}
tsdbReaderClose(pReader);
......@@ -1139,8 +1177,8 @@ static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts,
}
static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) {
return getTableGroupId(&pInfo->pTableScanOp->pTaskInfo->tableqinfoList, uid);
// SHashObj* map = pInfo->pTableScanOp->pTaskInfo->tableqinfoList.map;
return getTableGroupId(pInfo->pTableScanOp->pTaskInfo->pTableInfoList, uid);
// SHashObj* map = pInfo->pTableScanOp->pTaskInfo->pTableInfoList.map;
// uint64_t* groupId = taosHashGet(map, &uid, sizeof(int64_t));
// if (groupId) {
// return *groupId;
......@@ -1564,7 +1602,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
pInfo->pRes->info.type = STREAM_NORMAL;
pInfo->pRes->info.version = pBlock->info.version;
pInfo->pRes->info.groupId = getTableGroupId(&pTaskInfo->tableqinfoList, pBlock->info.uid);
pInfo->pRes->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.uid);
// todo extract method
for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
......@@ -2076,12 +2114,13 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
}
}
static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) {
static SArray* extractTableIdList(const STableListInfo* pTableListInfo) {
SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t));
// Transfer the Array of STableKeyInfo into uid list.
for (int32_t i = 0; i < taosArrayGetSize(pTableGroupInfo->pTableList); ++i) {
STableKeyInfo* pkeyInfo = taosArrayGet(pTableGroupInfo->pTableList, i);
size_t size = tableListGetSize(pTableListInfo);
for (int32_t i = 0; i < size; ++i) {
STableKeyInfo* pkeyInfo = tableListGetInfo(pTableListInfo, i);
taosArrayPush(tableIdList, &pkeyInfo->uid);
}
......@@ -2350,7 +2389,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
STableKeyInfo* pList = NULL;
int32_t num = 0;
getTablesOfGroup(&pTaskInfo->tableqinfoList, 0, &pList, &num);
tableListGetGroupList(pTaskInfo->pTableInfoList, 0, &pList, &num);
if (pHandle->initTableReader) {
pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
......@@ -2383,7 +2422,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
// set the extract column id to streamHandle
tqReaderSetColIdList(pInfo->tqReader, pColIds);
SArray* tableIdList = extractTableIdList(&pTaskInfo->tableqinfoList);
SArray* tableIdList = extractTableIdList(pTaskInfo->pTableInfoList);
code = tqReaderSetTbUidList(pInfo->tqReader, tableIdList);
if (code != 0) {
taosArrayDestroy(tableIdList);
......@@ -4082,7 +4121,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
SSDataBlock* pRes = pInfo->pRes;
blockDataCleanup(pRes);
int32_t size = taosArrayGetSize(pInfo->pTableList->pTableList);
int32_t size = tableListGetSize(pTaskInfo->pTableInfoList);
if (size == 0) {
setTaskStatus(pTaskInfo, TASK_COMPLETED);
return NULL;
......@@ -4094,7 +4133,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
metaReaderInit(&mr, pInfo->readHandle.meta, 0);
while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
STableKeyInfo* item = taosArrayGet(pInfo->pTableList->pTableList, pInfo->curPos);
STableKeyInfo* item = tableListGetInfo(pInfo->pTableList, pInfo->curPos);
int32_t code = metaGetTableEntryByUid(&mr, item->uid);
tDecoderClear(&mr.coder);
if (code != TSDB_CODE_SUCCESS) {
......@@ -4209,47 +4248,10 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
return NULL;
}
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
const char* idStr) {
int64_t st = taosGetTimestampUs();
if (pHandle == NULL) {
qError("invalid handle, in creating operator tree, %s", idStr);
return TSDB_CODE_INVALID_PARA;
}
int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to getTableList, code: %s", tstrerror(code));
return code;
}
pTableListInfo->numOfOuputGroups = 1;
int64_t st1 = taosGetTimestampUs();
qDebug("generate queried table list completed, elapsed time:%.2f ms %s", (st1 - st) / 1000.0, idStr);
if (taosArrayGetSize(pTableListInfo->pTableList) == 0) {
qDebug("no table qualified for query, %s" PRIx64, idStr);
return TSDB_CODE_SUCCESS;
}
code = setGroupIdMapForAllTables(pTableListInfo, pHandle, pGroupTags, groupSort);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
int64_t st2 = taosGetTimestampUs();
qDebug("generate group id map completed, elapsed time:%.2f ms %s", (st2 - st1) / 1000.0, idStr);
return TSDB_CODE_SUCCESS;
}
int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle, STableListInfo* pTableListInfo,
int32_t tableStartIdx, int32_t tableEndIdx, SArray* arrayReader, const char* idstr) {
for (int32_t i = tableStartIdx; i <= tableEndIdx; ++i) {
STableKeyInfo* pList = taosArrayGet(pTableListInfo->pTableList, i);
STableKeyInfo* pList = tableListGetInfo(pTableListInfo, i);
STsdbReader* pReader = NULL;
tsdbReaderOpen(pHandle->vnode, pQueryCond, pList, 1, &pReader, idstr);
taosArrayPush(arrayReader, &pReader);
......@@ -4262,7 +4264,7 @@ int32_t createMultipleDataReaders2(SQueryTableDataCond* pQueryCond, SReadHandle*
STableListInfo* pTableListInfo, int32_t tableStartIdx, int32_t tableEndIdx,
STsdbReader** ppReader, const char* idstr) {
STsdbReader* pReader = NULL;
void* pStart = taosArrayGet(pTableListInfo->pTableList, tableStartIdx);
void* pStart = tableListGetInfo(pTableListInfo, tableStartIdx);
int32_t num = tableEndIdx - tableStartIdx + 1;
int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, pStart, num, &pReader, idstr);
......@@ -4522,7 +4524,7 @@ static SSDataBlock* getTableDataBlockTemp(void* param) {
int64_t st = taosGetTimestampUs();
void* p =taosArrayGet(pInfo->tableListInfo->pTableList, readIdx + pInfo->tableStartIndex);
void* p = tableListGetInfo(pInfo->tableListInfo, readIdx + pInfo->tableStartIndex);
SReadHandle* pHandle = &pInfo->readHandle;
tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, &pInfo->pReader, GET_TASKID(pTaskInfo));
......@@ -4565,7 +4567,7 @@ static SSDataBlock* getTableDataBlockTemp(void* param) {
continue;
}
pBlock->info.groupId = getTableGroupId(&pOperator->pTaskInfo->tableqinfoList, pBlock->info.uid);
pBlock->info.groupId = getTableGroupId(pOperator->pTaskInfo->pTableInfoList, pBlock->info.uid);
pOperator->resultInfo.totalRows += pBlock->info.rows; // pTableScanInfo->readRecorder.totalRows;
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
......@@ -4622,7 +4624,7 @@ static SSDataBlock* getTableDataBlock2(void* param) {
continue;
}
pBlock->info.groupId = getTableGroupId(&pOperator->pTaskInfo->tableqinfoList, pBlock->info.uid);
pBlock->info.groupId = getTableGroupId(pOperator->pTaskInfo->pTableInfoList, pBlock->info.uid);
pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
......@@ -4676,7 +4678,7 @@ static SSDataBlock* getTableDataBlock(void* param) {
continue;
}
pBlock->info.groupId = getTableGroupId(&pOperator->pTaskInfo->tableqinfoList, pBlock->info.uid);
pBlock->info.groupId = getTableGroupId(pOperator->pTaskInfo->pTableInfoList, pBlock->info.uid);
pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
......@@ -4719,10 +4721,10 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
{
size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList);
size_t numOfTables = tableListGetSize(pInfo->tableListInfo);
int32_t i = pInfo->tableStartIndex + 1;
for (; i < tableListSize; ++i) {
STableKeyInfo* tableKeyInfo = taosArrayGet(pInfo->tableListInfo->pTableList, i);
for (; i < numOfTables; ++i) {
STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->tableListInfo, i);
if (tableKeyInfo->groupId != pInfo->groupId) {
break;
}
......@@ -4847,7 +4849,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
T_LONG_JMP(pTaskInfo->env, code);
}
size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList);
size_t tableListSize = tableListGetSize(pInfo->tableListInfo);
if (!pInfo->hasGroupId) {
pInfo->hasGroupId = true;
......@@ -4856,7 +4858,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
return NULL;
}
pInfo->tableStartIndex = 0;
pInfo->groupId = ((STableKeyInfo*)taosArrayGet(pInfo->tableListInfo->pTableList, pInfo->tableStartIndex))->groupId;
pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->tableListInfo, pInfo->tableStartIndex))->groupId;
startGroupTableMergeScan(pOperator);
}
......@@ -4875,8 +4877,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
break;
}
pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
pInfo->groupId =
((STableKeyInfo*)taosArrayGet(pInfo->tableListInfo->pTableList, pInfo->tableStartIndex))->groupId;
pInfo->groupId = tableListGetInfo(pInfo->tableListInfo, pInfo->tableStartIndex)->groupId;
startGroupTableMergeScan(pOperator);
}
}
......
......@@ -182,7 +182,7 @@ SNode* createCreateUserStmt(SAstCreateContext* pCxt, SToken* pUserName, const ST
SNode* createAlterUserStmt(SAstCreateContext* pCxt, SToken* pUserName, int8_t alterType, const SToken* pVal);
SNode* createDropUserStmt(SAstCreateContext* pCxt, SToken* pUserName);
SNode* createCreateDnodeStmt(SAstCreateContext* pCxt, const SToken* pFqdn, const SToken* pPort);
SNode* createDropDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode);
SNode* createDropDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode, bool force);
SNode* createAlterDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode, const SToken* pConfig, const SToken* pValue);
SNode* createCreateIndexStmt(SAstCreateContext* pCxt, EIndexType type, bool ignoreExists, SNode* pIndexName,
SNode* pRealTable, SNodeList* pCols, SNode* pOptions);
......
......@@ -120,8 +120,8 @@ priv_level(A) ::= db_name(B) NK_DOT NK_STAR.
/************************************************ create/drop/alter dnode *********************************************/
cmd ::= CREATE DNODE dnode_endpoint(A). { pCxt->pRootNode = createCreateDnodeStmt(pCxt, &A, NULL); }
cmd ::= CREATE DNODE dnode_endpoint(A) PORT NK_INTEGER(B). { pCxt->pRootNode = createCreateDnodeStmt(pCxt, &A, &B); }
cmd ::= DROP DNODE NK_INTEGER(A). { pCxt->pRootNode = createDropDnodeStmt(pCxt, &A); }
cmd ::= DROP DNODE dnode_endpoint(A). { pCxt->pRootNode = createDropDnodeStmt(pCxt, &A); }
cmd ::= DROP DNODE NK_INTEGER(A) force_opt(B). { pCxt->pRootNode = createDropDnodeStmt(pCxt, &A, B); }
cmd ::= DROP DNODE dnode_endpoint(A) force_opt(B). { pCxt->pRootNode = createDropDnodeStmt(pCxt, &A, B); }
cmd ::= ALTER DNODE NK_INTEGER(A) NK_STRING(B). { pCxt->pRootNode = createAlterDnodeStmt(pCxt, &A, &B, NULL); }
cmd ::= ALTER DNODE NK_INTEGER(A) NK_STRING(B) NK_STRING(C). { pCxt->pRootNode = createAlterDnodeStmt(pCxt, &A, &B, &C); }
cmd ::= ALTER ALL DNODES NK_STRING(A). { pCxt->pRootNode = createAlterDnodeStmt(pCxt, NULL, &A, NULL); }
......@@ -133,6 +133,11 @@ dnode_endpoint(A) ::= NK_STRING(B).
dnode_endpoint(A) ::= NK_ID(B). { A = B; }
dnode_endpoint(A) ::= NK_IPTOKEN(B). { A = B; }
%type force_opt { bool }
%destructor force_opt { }
force_opt(A) ::= . { A = false; }
force_opt(A) ::= FORCE. { A = true; }
/************************************************ alter local *********************************************************/
cmd ::= ALTER LOCAL NK_STRING(A). { pCxt->pRootNode = createAlterLocalStmt(pCxt, &A, NULL); }
cmd ::= ALTER LOCAL NK_STRING(A) NK_STRING(B). { pCxt->pRootNode = createAlterLocalStmt(pCxt, &A, &B); }
......
......@@ -1457,7 +1457,7 @@ SNode* createCreateDnodeStmt(SAstCreateContext* pCxt, const SToken* pFqdn, const
return (SNode*)pStmt;
}
SNode* createDropDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode) {
SNode* createDropDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode, bool force) {
CHECK_PARSER_STATUS(pCxt);
SDropDnodeStmt* pStmt = (SDropDnodeStmt*)nodesMakeNode(QUERY_NODE_DROP_DNODE_STMT);
CHECK_OUT_OF_MEM(pStmt);
......@@ -1469,6 +1469,7 @@ SNode* createDropDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode) {
return NULL;
}
}
pStmt->force = force;
return (SNode*)pStmt;
}
......
......@@ -97,6 +97,7 @@ static SKeyword keywordTable[] = {
{"FLOAT", TK_FLOAT},
{"FLUSH", TK_FLUSH},
{"FROM", TK_FROM},
{"FORCE", TK_FORCE},
{"FUNCTION", TK_FUNCTION},
{"FUNCTIONS", TK_FUNCTIONS},
{"GRANT", TK_GRANT},
......
......@@ -674,6 +674,10 @@ static bool isSelectFunc(const SNode* pNode) {
return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsSelectFunc(((SFunctionNode*)pNode)->funcId));
}
static bool isWindowPseudoColumnFunc(const SNode* pNode) {
return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsWindowPseudoColumnFunc(((SFunctionNode*)pNode)->funcId));
}
static bool isTimelineFunc(const SNode* pNode) {
return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsTimelineFunc(((SFunctionNode*)pNode)->funcId));
}
......@@ -1264,10 +1268,7 @@ static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
}
static EDealRes haveVectorFunction(SNode* pNode, void* pContext) {
if (isAggFunc(pNode)) {
*((bool*)pContext) = true;
return DEAL_RES_END;
} else if (isIndefiniteRowsFunc(pNode)) {
if (isAggFunc(pNode) || isIndefiniteRowsFunc(pNode) || isWindowPseudoColumnFunc(pNode)) {
*((bool*)pContext) = true;
return DEAL_RES_END;
}
......@@ -4995,6 +4996,7 @@ static int32_t translateDropDnode(STranslateContext* pCxt, SDropDnodeStmt* pStmt
dropReq.dnodeId = pStmt->dnodeId;
strcpy(dropReq.fqdn, pStmt->fqdn);
dropReq.port = pStmt->port;
dropReq.force = pStmt->force;
return buildCmdMsg(pCxt, TDMT_MND_DROP_DNODE, (FSerializeFunc)tSerializeSDropDnodeReq, &dropReq);
}
......
此差异已折叠。
......@@ -87,7 +87,6 @@ TEST_F(ParserInitialDTest, dropConsumerGroup) {
// todo DROP database
// todo DROP dnode
TEST_F(ParserInitialDTest, dropDnode) {
useDb("root", "test");
......@@ -95,11 +94,15 @@ TEST_F(ParserInitialDTest, dropDnode) {
auto clearDropDnodeReq = [&]() { memset(&expect, 0, sizeof(SDropDnodeReq)); };
auto setDropDnodeReqById = [&](int32_t dnodeId) { expect.dnodeId = dnodeId; };
auto setDropDnodeReqById = [&](int32_t dnodeId, bool force = false) {
expect.dnodeId = dnodeId;
expect.force = force;
};
auto setDropDnodeReqByEndpoint = [&](const char* pFqdn, int32_t port) {
auto setDropDnodeReqByEndpoint = [&](const char* pFqdn, int32_t port, bool force = false) {
strcpy(expect.fqdn, pFqdn);
expect.port = port;
expect.force = force;
};
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
......@@ -110,15 +113,24 @@ TEST_F(ParserInitialDTest, dropDnode) {
ASSERT_EQ(req.dnodeId, expect.dnodeId);
ASSERT_EQ(std::string(req.fqdn), std::string(expect.fqdn));
ASSERT_EQ(req.port, expect.port);
ASSERT_EQ(req.force, expect.force);
});
setDropDnodeReqById(1);
run("DROP DNODE 1");
clearDropDnodeReq();
setDropDnodeReqById(2, true);
run("DROP DNODE 2 FORCE");
clearDropDnodeReq();
setDropDnodeReqByEndpoint("host1", 7030);
run("DROP DNODE 'host1:7030'");
clearDropDnodeReq();
setDropDnodeReqByEndpoint("host2", 8030, true);
run("DROP DNODE 'host2:8030' FORCE");
clearDropDnodeReq();
}
// todo DROP function
......
......@@ -341,6 +341,8 @@ void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const
void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s);
void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s);
void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s);
// for debug --------------
void syncNodePrint(SSyncNode* pObj);
void syncNodePrint2(char* s, SSyncNode* pObj);
......
......@@ -47,6 +47,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodeEqNoop(SSyncNode* ths);
static int32_t syncNodeAppendNoop(SSyncNode* ths);
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
// process message ----
int32_t syncNodeOnPing(SSyncNode* ths, SyncPing* pMsg);
......@@ -528,6 +529,20 @@ int32_t syncEndSnapshot(int64_t rid) {
return code;
}
int32_t syncStepDown(int64_t rid, SyncTerm newTerm) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
ASSERT(rid == pSyncNode->rid);
syncNodeStepDown(pSyncNode, newTerm);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return 0;
}
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
if (pSyncNode->peersNum == 0) {
sDebug("only one replica, cannot leader transfer");
......@@ -1132,7 +1147,8 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
sError("vgId:%d, failed to open raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
goto _error;
}
if (pSyncInfo->syncCfg.replicaNum > 0 && pSyncInfo->syncCfg.replicaNum != pSyncNode->pRaftCfg->cfg.replicaNum) {
if (pSyncInfo->syncCfg.replicaNum > 0 && syncIsConfigChanged(&pSyncNode->pRaftCfg->cfg, &pSyncInfo->syncCfg)) {
sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
pSyncNode->pRaftCfg->cfg = pSyncInfo->syncCfg;
if (raftCfgPersist(pSyncNode->pRaftCfg) != 0) {
......@@ -2095,12 +2111,11 @@ static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg
void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
SSyncCfg oldConfig = pSyncNode->pRaftCfg->cfg;
#if 1
if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
sInfo("vgId:1, sync not reconfig since not changed");
return;
}
#endif
pSyncNode->pRaftCfg->cfg = *pNewConfig;
pSyncNode->pRaftCfg->lastConfigIndex = lastConfigChangeIndex;
......@@ -3041,12 +3056,6 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) {
SRpcMsg rpcMsg;
syncHeartbeatReply2RpcMsg(pMsgReply, &rpcMsg);
#if 1
if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
syncNodeStepDown(ths, pMsg->term);
}
#endif
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
syncNodeResetElectTimer(ths);
ths->minMatchIndex = pMsg->minMatchIndex;
......@@ -3058,6 +3067,28 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) {
#endif
}
if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
// syncNodeStepDown(ths, pMsg->term);
SyncLocalCmd* pSyncMsg = syncLocalCmdBuild(ths->vgId);
pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
pSyncMsg->sdNewTerm = pMsg->term;
SRpcMsg rpcMsgLocalCmd;
syncLocalCmd2RpcMsg(pSyncMsg, &rpcMsgLocalCmd);
if (ths->FpEqMsg != NULL && ths->msgcb != NULL) {
int32_t code = ths->FpEqMsg(ths->msgcb, &rpcMsgLocalCmd);
if (code != 0) {
sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
rpcFreeCont(rpcMsgLocalCmd.pCont);
} else {
sTrace("vgId:%d, sync enqueue step-down msg, new-term: %" PRIu64, ths->vgId, pSyncMsg->sdNewTerm);
}
}
syncLocalCmdDestroy(pSyncMsg);
}
/*
// htonl
SMsgHead* pHead = rpcMsg.pCont;
......@@ -3081,6 +3112,19 @@ int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg) {
return 0;
}
int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg) {
syncLogRecvLocalCmd(ths, pMsg, "");
if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
syncNodeStepDown(ths, pMsg->sdNewTerm);
} else {
syncNodeErrorLog(ths, "error local cmd");
}
return 0;
}
// TLA+ Spec
// ClientRequest(i, v) ==
// /\ state[i] = Leader
......@@ -3379,7 +3423,7 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde
// ASSERT(code == 0);
// ASSERT(pEntry != NULL);
if (code != 0 || pEntry == NULL) {
syncNodeErrorLog(ths, "get log entry error");
syncNodeErrorLog(ths, "get log entry error");
continue;
}
}
......@@ -3743,3 +3787,10 @@ void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* p
host, port, pMsg->term, pMsg->privateTerm, s);
syncNodeEventLog(pSyncNode, logBuf);
}
void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "recv sync-local-cmd {cmd:%d-%s, sd-new-term:%" PRIu64 "}, %s", pMsg->cmd,
syncLocalCmdGetStr(pMsg->cmd), pMsg->sdNewTerm, s);
syncNodeEventLog(pSyncNode, logBuf);
}
\ No newline at end of file
......@@ -3097,6 +3097,14 @@ void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg) {
}
// ---------------------------------------------
const char* syncLocalCmdGetStr(int32_t cmd) {
if (cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
return "step-down";
}
return "unknown-local-cmd";
}
SyncLocalCmd* syncLocalCmdBuild(int32_t vgId) {
uint32_t bytes = sizeof(SyncLocalCmd);
SyncLocalCmd* pMsg = taosMemoryMalloc(bytes);
......
......@@ -336,3 +336,12 @@ int64_t taosMemorySize(void *ptr) {
#endif
#endif
}
void taosMemoryTrim(int32_t size) {
#if defined(WINDOWS) || defined(DARWIN)
// do nothing
return;
#else
malloc_trim(size);
#endif
}
......@@ -915,7 +915,7 @@ int32_t cfgLoadFromApollUrl(SConfig *pConfig, const char *url) {
int32_t olen, vlen, vlen2, vlen3;
int32_t code = 0;
if (url == NULL || strlen(url) == 0) {
uInfo("fail to load apoll url");
uInfo("apoll url not load");
return 0;
}
......
......@@ -48,6 +48,7 @@
./test.sh -f tsim/dnode/drop_dnode_has_vnode_replica3.sim
./test.sh -f tsim/dnode/drop_dnode_has_multi_vnode_replica1.sim
./test.sh -f tsim/dnode/drop_dnode_has_multi_vnode_replica3.sim
./test.sh -f tsim/dnode/drop_dnode_force.sim
./test.sh -f tsim/dnode/offline_reason.sim
./test.sh -f tsim/dnode/redistribute_vgroup_replica1.sim
./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/deploy.sh -n dnode4 -i 4
system sh/deploy.sh -n dnode5 -i 5
system sh/cfg.sh -n dnode1 -c supportVnodes -v 0
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
system sh/exec.sh -n dnode4 -s start
sql connect
print =============== step1 create dnode2 dnode3 dnode4 dnode 5
sql create dnode $hostname port 7200
sql create dnode $hostname port 7300
sql create dnode $hostname port 7400
sql create dnode $hostname port 7500
$x = 0
step1:
$ = $x + 1
sleep 1000
if $x == 10 then
print ====> dnode not online!
return -1
endi
sql select * from information_schema.ins_dnodes
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ===> $data10 $data11 $data12 $data13 $data14 $data15
if $rows != 5 then
return -1
endi
if $data(1)[4] != ready then
goto step1
endi
if $data(2)[4] != ready then
goto step1
endi
if $data(3)[4] != ready then
goto step1
endi
if $data(4)[4] != ready then
goto step1
endi
if $data(5)[4] != offline then
goto step1
endi
print =============== step2 create database
sql create database d1 vgroups 1 replica 3
sql use d1
sql create table d1.st0 (ts timestamp, i int) tags (j int)
sql create table d1.c0 using st0 tags(0)
sql create table d1.c1 using st0 tags(1)
sql create table d1.c2 using st0 tags(2)
sql create table d1.c3 using st0 tags(3)
sql create table d1.c4 using st0 tags(4)
sql create table d1.c5 using st0 tags(5)
sql create table d1.c6 using st0 tags(6)
sql create table d1.c7 using st0 tags(7)
sql create table d1.c8 using st0 tags(8)
sql create table d1.c9 using st0 tags(9)
sql show d1.tables
if $rows != 10 then
return -1
endi
print d1.rows ===> $rows
sql select * from information_schema.ins_tables where stable_name = 'st0' and db_name = 'd1'
if $rows != 10 then
return -1
endi
sql create database d2 vgroups 3 replica 1
sql use d2
sql create table d2.st1 (ts timestamp, i int) tags (j int)
sql create table d2.c10 using st1 tags(0)
sql create table d2.c11 using st1 tags(1)
sql create table d2.c12 using st1 tags(2)
sql create table d2.c13 using st1 tags(3)
sql create table d2.c14 using st1 tags(4)
sql create table d2.c15 using st1 tags(5)
sql create table d2.c16 using st1 tags(6)
sql create table d2.c17 using st1 tags(7)
sql create table d2.c18 using st1 tags(8)
sql create table d2.c19 using st1 tags(9)
sql create table d2.c190 using st1 tags(9)
sql show d2.tables
if $rows != 11 then
return -1
endi
sql reset query cache
sql select * from information_schema.ins_tables where stable_name = 'st1' and db_name = 'd2'
print d2.st1.tables ===> $rows
if $rows != 11 then
return -1
endi
sql create table d2.st2 (ts timestamp, i int) tags (j int)
sql create table d2.c20 using st2 tags(0)
sql create table d2.c21 using st2 tags(1)
sql create table d2.c22 using st2 tags(2)
sql create table d2.c23 using st2 tags(3)
sql create table d2.c24 using st2 tags(4)
sql create table d2.c25 using st2 tags(5)
sql create table d2.c26 using st2 tags(6)
sql create table d2.c27 using st2 tags(7)
sql create table d2.c28 using st2 tags(8)
sql create table d2.c29 using st2 tags(9)
sql create table d2.c290 using st2 tags(9)
sql create table d2.c291 using st2 tags(9)
sql show d2.tables
if $rows != 23 then
return -1
endi
sql reset query cache
sql select * from information_schema.ins_tables where stable_name = 'st2' and db_name = 'd2'
print d2.st2.tables ===> $rows
if $rows != 12 then
return -1
endi
print =============== step3: create qnode snode on dnode 2
sql create qnode on dnode 2
sql create snode on dnode 2
sql select * from information_schema.ins_qnodes
if $rows != 1 then
return -1
endi
sql show snodes
if $rows != 1 then
return -1
endi
print =============== step4: create mnode on dnode 2
sql create mnode on dnode 3
sql create mnode on dnode 2
$x = 0
step4:
$x = $x + 1
sleep 1000
if $x == 10 then
return -1
endi
sql select * from information_schema.ins_mnodes -x step4
print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4]
print $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4]
print $data[2][0] $data[2][1] $data[2][2] $data[2][3] $data[1][4]
#if $data(1)[2] != leader then
# goto step4
#endi
if $data(2)[2] != follower then
goto step4
endi
#if $data(3)[2] != follower then
# goto step4
#endi
print =============== step5: create dnode 5
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode5 -s start
$x = 0
step5:
$ = $x + 1
sleep 1000
if $x == 10 then
print ====> dnode not online!
return -1
endi
sql select * from information_schema.ins_dnodes
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ===> $data10 $data11 $data12 $data13 $data14 $data15
print ===> $data20 $data21 $data22 $data23 $data24 $data25
print ===> $data30 $data31 $data32 $data33 $data34 $data35
print ===> $data40 $data41 $data42 $data43 $data44 $data45
if $rows != 5 then
return -1
endi
if $data(1)[4] != ready then
goto step5
endi
if $data(2)[4] != offline then
goto step5
endi
if $data(3)[4] != ready then
goto step5
endi
if $data(4)[4] != ready then
goto step5
endi
if $data(5)[4] != ready then
goto step5
endi
print =============== step5: drop dnode 2
sql_error drop dnode 2
sql drop dnode 2 force
print select * from information_schema.ins_dnodes;
sql select * from information_schema.ins_dnodes;
print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4]
print $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4]
if $rows != 4 then
return -1
endi
print select * from information_schema.ins_mnodes;
sql select * from information_schema.ins_mnodes
print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4]
print $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4]
if $rows != 2 then
return -1
endi
if $data(1)[2] != leader then
return -1
endi
sql select * from information_schema.ins_qnodes
if $rows != 0 then
return -1
endi
sql show snodes
if $rows != 0 then
return -1
endi
print =============== step6: check d1
sql reset query cache
sql show d1.tables
if $rows != 10 then
return -1
endi
print =============== step7: check d2
sql show d2.tables
print ===> d2.tables: $rows remained
if $rows > 23 then
return -1
endi
if $rows <= 0 then
return -1
endi
print =============== step8: drop stable and recreate it
sql select * from information_schema.ins_tables where stable_name = 'st2' and db_name = 'd2'
print d2.st2.tables ==> $rows
sql drop table d2.st2;
sql create table d2.st2 (ts timestamp, i int) tags (j int)
sql create table d2.c20 using st2 tags(0)
sql create table d2.c21 using st2 tags(1)
sql create table d2.c22 using st2 tags(2)
sql create table d2.c23 using st2 tags(3)
sql create table d2.c24 using st2 tags(4)
sql create table d2.c25 using st2 tags(5)
sql create table d2.c26 using st2 tags(6)
sql create table d2.c27 using st2 tags(7)
sql create table d2.c28 using st2 tags(8)
sql create table d2.c29 using st2 tags(9)
sql create table d2.c30 using st2 tags(9)
sql create table d2.c31 using st2 tags(9)
sql create table d2.c32 using st2 tags(9)
sql select * from information_schema.ins_tables where stable_name = 'st2' and db_name = 'd2'
print d2.st2.tables ==> $rows
if $rows != 13 then
return -1
endi
print =============== step9: alter stable
return
print By modifying the stable, the missing stable information can be reconstructed in the vnode.
print However, currently, getting the stable meta from the vnode, and return the table not exist
print To handle this, we need to modify the way stable-meta is fetched
sql select * from information_schema.ins_tables where stable_name = 'st1' and db_name = 'd2'
print d2.st1.tables ==> $rows
$remains = $rows
sql alter table d2.st1 add column b smallint
return
sql create table d2.c30 using st tags(0)
sql create table d2.c31 using st tags(1)
sql create table d2.c32 using st tags(2)
sql create table d2.c33 using st tags(3)
sql create table d2.c34 using st tags(4)
sql create table d2.c35 using st tags(5)
sql create table d2.c36 using st tags(6)
sql create table d2.c37 using st tags(7)
sql create table d2.c38 using st tags(8)
sql create table d2.c39 using st tags(9)
sql show d2.tables
print d2.st1.tables ==> $rows
$total = $remains + 10
if $rows != $total then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
......@@ -995,3 +995,22 @@ endi
if $data00 != 0.000000000 then
return -1
endi
sql create table ft1(ts timestamp, a int, b int , c int, d double);
sql insert into ft1 values(1648791213000,1,2,3,1.0);
sql_error select sum(_wduration), a from ft1 state_window(a);
sql_error select count(_wduration), a from ft1 state_window(a);
sql_error select max(_wduration), a from ft1 state_window(a);
sql_error select sum(1 + _wduration), a from ft1 state_window(a);
sql_error select sum(cast(_wstart as bigint)), a from ft1 state_window(a);
sql_error select sum(cast(_wend as bigint)), a from ft1 state_window(a);
sql_error create stream streams1 trigger at_once into streamt as select _wstart, sum(_wduration) from ft1 interval(10s);
sql_error create stream streams1 trigger at_once into streamt as select _wstart, sum(cast(_wend as bigint)) from ft1 interval(10s);
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册