提交 dc5235eb 编写于 作者: X Xiaoyu Wang

feat: add pause/resume stream statement

上级 641c707b
......@@ -211,140 +211,143 @@
#define TK_REPLACE 193
#define TK_STREAM 194
#define TK_INTO 195
#define TK_TRIGGER 196
#define TK_AT_ONCE 197
#define TK_WINDOW_CLOSE 198
#define TK_IGNORE 199
#define TK_EXPIRED 200
#define TK_FILL_HISTORY 201
#define TK_UPDATE 202
#define TK_SUBTABLE 203
#define TK_KILL 204
#define TK_CONNECTION 205
#define TK_TRANSACTION 206
#define TK_BALANCE 207
#define TK_VGROUP 208
#define TK_LEADER 209
#define TK_MERGE 210
#define TK_REDISTRIBUTE 211
#define TK_SPLIT 212
#define TK_DELETE 213
#define TK_INSERT 214
#define TK_NULL 215
#define TK_NK_QUESTION 216
#define TK_NK_ARROW 217
#define TK_ROWTS 218
#define TK_QSTART 219
#define TK_QEND 220
#define TK_QDURATION 221
#define TK_WSTART 222
#define TK_WEND 223
#define TK_WDURATION 224
#define TK_IROWTS 225
#define TK_ISFILLED 226
#define TK_CAST 227
#define TK_NOW 228
#define TK_TODAY 229
#define TK_TIMEZONE 230
#define TK_CLIENT_VERSION 231
#define TK_SERVER_VERSION 232
#define TK_SERVER_STATUS 233
#define TK_CURRENT_USER 234
#define TK_CASE 235
#define TK_WHEN 236
#define TK_THEN 237
#define TK_ELSE 238
#define TK_BETWEEN 239
#define TK_IS 240
#define TK_NK_LT 241
#define TK_NK_GT 242
#define TK_NK_LE 243
#define TK_NK_GE 244
#define TK_NK_NE 245
#define TK_MATCH 246
#define TK_NMATCH 247
#define TK_CONTAINS 248
#define TK_IN 249
#define TK_JOIN 250
#define TK_INNER 251
#define TK_SELECT 252
#define TK_DISTINCT 253
#define TK_WHERE 254
#define TK_PARTITION 255
#define TK_BY 256
#define TK_SESSION 257
#define TK_STATE_WINDOW 258
#define TK_EVENT_WINDOW 259
#define TK_SLIDING 260
#define TK_FILL 261
#define TK_VALUE 262
#define TK_VALUE_F 263
#define TK_NONE 264
#define TK_PREV 265
#define TK_NULL_F 266
#define TK_LINEAR 267
#define TK_NEXT 268
#define TK_HAVING 269
#define TK_RANGE 270
#define TK_EVERY 271
#define TK_ORDER 272
#define TK_SLIMIT 273
#define TK_SOFFSET 274
#define TK_LIMIT 275
#define TK_OFFSET 276
#define TK_ASC 277
#define TK_NULLS 278
#define TK_ABORT 279
#define TK_AFTER 280
#define TK_ATTACH 281
#define TK_BEFORE 282
#define TK_BEGIN 283
#define TK_BITAND 284
#define TK_BITNOT 285
#define TK_BITOR 286
#define TK_BLOCKS 287
#define TK_CHANGE 288
#define TK_COMMA 289
#define TK_CONCAT 290
#define TK_CONFLICT 291
#define TK_COPY 292
#define TK_DEFERRED 293
#define TK_DELIMITERS 294
#define TK_DETACH 295
#define TK_DIVIDE 296
#define TK_DOT 297
#define TK_EACH 298
#define TK_FAIL 299
#define TK_FILE 300
#define TK_FOR 301
#define TK_GLOB 302
#define TK_ID 303
#define TK_IMMEDIATE 304
#define TK_IMPORT 305
#define TK_INITIALLY 306
#define TK_INSTEAD 307
#define TK_ISNULL 308
#define TK_KEY 309
#define TK_MODULES 310
#define TK_NK_BITNOT 311
#define TK_NK_SEMI 312
#define TK_NOTNULL 313
#define TK_OF 314
#define TK_PLUS 315
#define TK_PRIVILEGE 316
#define TK_RAISE 317
#define TK_RESTRICT 318
#define TK_ROW 319
#define TK_SEMI 320
#define TK_STAR 321
#define TK_STATEMENT 322
#define TK_STRICT 323
#define TK_STRING 324
#define TK_TIMES 325
#define TK_VALUES 326
#define TK_VARIABLE 327
#define TK_VIEW 328
#define TK_WAL 329
#define TK_PAUSE 196
#define TK_RESUME 197
#define TK_TRIGGER 198
#define TK_AT_ONCE 199
#define TK_WINDOW_CLOSE 200
#define TK_IGNORE 201
#define TK_EXPIRED 202
#define TK_FILL_HISTORY 203
#define TK_UPDATE 204
#define TK_SUBTABLE 205
#define TK_UNTREATED 206
#define TK_KILL 207
#define TK_CONNECTION 208
#define TK_TRANSACTION 209
#define TK_BALANCE 210
#define TK_VGROUP 211
#define TK_LEADER 212
#define TK_MERGE 213
#define TK_REDISTRIBUTE 214
#define TK_SPLIT 215
#define TK_DELETE 216
#define TK_INSERT 217
#define TK_NULL 218
#define TK_NK_QUESTION 219
#define TK_NK_ARROW 220
#define TK_ROWTS 221
#define TK_QSTART 222
#define TK_QEND 223
#define TK_QDURATION 224
#define TK_WSTART 225
#define TK_WEND 226
#define TK_WDURATION 227
#define TK_IROWTS 228
#define TK_ISFILLED 229
#define TK_CAST 230
#define TK_NOW 231
#define TK_TODAY 232
#define TK_TIMEZONE 233
#define TK_CLIENT_VERSION 234
#define TK_SERVER_VERSION 235
#define TK_SERVER_STATUS 236
#define TK_CURRENT_USER 237
#define TK_CASE 238
#define TK_WHEN 239
#define TK_THEN 240
#define TK_ELSE 241
#define TK_BETWEEN 242
#define TK_IS 243
#define TK_NK_LT 244
#define TK_NK_GT 245
#define TK_NK_LE 246
#define TK_NK_GE 247
#define TK_NK_NE 248
#define TK_MATCH 249
#define TK_NMATCH 250
#define TK_CONTAINS 251
#define TK_IN 252
#define TK_JOIN 253
#define TK_INNER 254
#define TK_SELECT 255
#define TK_DISTINCT 256
#define TK_WHERE 257
#define TK_PARTITION 258
#define TK_BY 259
#define TK_SESSION 260
#define TK_STATE_WINDOW 261
#define TK_EVENT_WINDOW 262
#define TK_SLIDING 263
#define TK_FILL 264
#define TK_VALUE 265
#define TK_VALUE_F 266
#define TK_NONE 267
#define TK_PREV 268
#define TK_NULL_F 269
#define TK_LINEAR 270
#define TK_NEXT 271
#define TK_HAVING 272
#define TK_RANGE 273
#define TK_EVERY 274
#define TK_ORDER 275
#define TK_SLIMIT 276
#define TK_SOFFSET 277
#define TK_LIMIT 278
#define TK_OFFSET 279
#define TK_ASC 280
#define TK_NULLS 281
#define TK_ABORT 282
#define TK_AFTER 283
#define TK_ATTACH 284
#define TK_BEFORE 285
#define TK_BEGIN 286
#define TK_BITAND 287
#define TK_BITNOT 288
#define TK_BITOR 289
#define TK_BLOCKS 290
#define TK_CHANGE 291
#define TK_COMMA 292
#define TK_CONCAT 293
#define TK_CONFLICT 294
#define TK_COPY 295
#define TK_DEFERRED 296
#define TK_DELIMITERS 297
#define TK_DETACH 298
#define TK_DIVIDE 299
#define TK_DOT 300
#define TK_EACH 301
#define TK_FAIL 302
#define TK_FILE 303
#define TK_FOR 304
#define TK_GLOB 305
#define TK_ID 306
#define TK_IMMEDIATE 307
#define TK_IMPORT 308
#define TK_INITIALLY 309
#define TK_INSTEAD 310
#define TK_ISNULL 311
#define TK_KEY 312
#define TK_MODULES 313
#define TK_NK_BITNOT 314
#define TK_NK_SEMI 315
#define TK_NOTNULL 316
#define TK_OF 317
#define TK_PLUS 318
#define TK_PRIVILEGE 319
#define TK_RAISE 320
#define TK_RESTRICT 321
#define TK_ROW 322
#define TK_SEMI 323
#define TK_STAR 324
#define TK_STATEMENT 325
#define TK_STRICT 326
#define TK_STRING 327
#define TK_TIMES 328
#define TK_VALUES 329
#define TK_VARIABLE 330
#define TK_VIEW 331
#define TK_WAL 332
#define TK_NK_SPACE 600
#define TK_NK_COMMENT 601
......
......@@ -436,6 +436,19 @@ typedef struct SDropStreamStmt {
bool ignoreNotExists;
} SDropStreamStmt;
typedef struct SPauseStreamStmt {
ENodeType type;
char streamName[TSDB_TABLE_NAME_LEN];
bool ignoreNotExists;
} SPauseStreamStmt;
typedef struct SResumeStreamStmt {
ENodeType type;
char streamName[TSDB_TABLE_NAME_LEN];
bool ignoreNotExists;
bool ignoreUntreated;
} SResumeStreamStmt;
typedef struct SCreateFunctionStmt {
ENodeType type;
bool orReplace;
......
......@@ -211,6 +211,8 @@ typedef enum ENodeType {
QUERY_NODE_SHOW_DB_ALIVE_STMT,
QUERY_NODE_SHOW_CLUSTER_ALIVE_STMT,
QUERY_NODE_BALANCE_VGROUP_LEADER_STMT,
QUERY_NODE_PAUSE_STREAM_STMT,
QUERY_NODE_RESUME_STREAM_STMT,
// logic plan node
QUERY_NODE_LOGIC_PLAN_SCAN = 1000,
......
......@@ -171,6 +171,10 @@ const char* nodesNodeName(ENodeType type) {
return "CreateStreamStmt";
case QUERY_NODE_DROP_STREAM_STMT:
return "DropStreamStmt";
case QUERY_NODE_PAUSE_STREAM_STMT:
return "PauseStreamStmt";
case QUERY_NODE_RESUME_STREAM_STMT:
return "ResumeStreamStmt";
case QUERY_NODE_BALANCE_VGROUP_STMT:
return "BalanceVgroupStmt";
case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT:
......
......@@ -384,6 +384,10 @@ SNode* nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SCreateStreamStmt));
case QUERY_NODE_DROP_STREAM_STMT:
return makeNode(type, sizeof(SDropStreamStmt));
case QUERY_NODE_PAUSE_STREAM_STMT:
return makeNode(type, sizeof(SPauseStreamStmt));
case QUERY_NODE_RESUME_STREAM_STMT:
return makeNode(type, sizeof(SResumeStreamStmt));
case QUERY_NODE_BALANCE_VGROUP_STMT:
return makeNode(type, sizeof(SBalanceVgroupStmt));
case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT:
......@@ -944,6 +948,8 @@ void nodesDestroyNode(SNode* pNode) {
break;
}
case QUERY_NODE_DROP_STREAM_STMT: // no pointer field
case QUERY_NODE_PAUSE_STREAM_STMT: // no pointer field
case QUERY_NODE_RESUME_STREAM_STMT: // no pointer field
case QUERY_NODE_BALANCE_VGROUP_STMT: // no pointer field
case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT: // no pointer field
case QUERY_NODE_MERGE_VGROUP_STMT: // no pointer field
......
......@@ -226,6 +226,8 @@ SNode* setStreamOptions(SAstCreateContext* pCxt, SNode* pOptions, EStreamOptions
SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken* pStreamName, SNode* pRealTable,
SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery, SNodeList* pCols);
SNode* createDropStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pStreamName);
SNode* createPauseStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pStreamName);
SNode* createResumeStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, bool ignoreUntreated, SToken* pStreamName);
SNode* createKillStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pId);
SNode* createKillQueryStmt(SAstCreateContext* pCxt, const SToken* pQueryId);
SNode* createBalanceVgroupStmt(SAstCreateContext* pCxt);
......
......@@ -587,6 +587,8 @@ cmd ::= CREATE STREAM not_exists_opt(E) stream_name(A) stream_options(B) INTO
full_table_name(C) col_list_opt(H) tag_def_or_ref_opt(F) subtable_opt(G)
AS query_or_subquery(D). { pCxt->pRootNode = createCreateStreamStmt(pCxt, E, &A, C, B, F, G, D, H); }
cmd ::= DROP STREAM exists_opt(A) stream_name(B). { pCxt->pRootNode = createDropStreamStmt(pCxt, A, &B); }
cmd ::= PAUSE STREAM exists_opt(A) stream_name(B). { pCxt->pRootNode = createPauseStreamStmt(pCxt, A, &B); }
cmd ::= RESUME STREAM exists_opt(A) ignore_opt(C) stream_name(B). { pCxt->pRootNode = createResumeStreamStmt(pCxt, A, C, &B); }
%type col_list_opt { SNodeList* }
%destructor col_list_opt { nodesDestroyList($$); }
......@@ -612,6 +614,11 @@ stream_options(A) ::= stream_options(B) IGNORE UPDATE NK_INTEGER(C).
subtable_opt(A) ::= . { A = NULL; }
subtable_opt(A) ::= SUBTABLE NK_LP expression(B) NK_RP. { A = releaseRawExprNode(pCxt, B); }
%type ignore_opt { bool }
%destructor ignore_opt { }
ignore_opt(A) ::= . { A = false; }
ignore_opt(A) ::= IGNORE UNTREATED. { A = true; }
/************************************************ kill connection/query ***********************************************/
cmd ::= KILL CONNECTION NK_INTEGER(A). { pCxt->pRootNode = createKillStmt(pCxt, QUERY_NODE_KILL_CONNECTION_STMT, &A); }
cmd ::= KILL QUERY NK_STRING(A). { pCxt->pRootNode = createKillQueryStmt(pCxt, &A); }
......
......@@ -1025,23 +1025,23 @@ static SNode* setDatabaseOptionImpl(SAstCreateContext* pCxt, SNode* pOptions, ED
pDbOptions->sstTrigger = taosStr2Int32(((SToken*)pVal)->z, NULL, 10);
break;
case DB_OPTION_TABLE_PREFIX: {
SValueNode *pNode = (SValueNode *)pVal;
SValueNode* pNode = (SValueNode*)pVal;
if (TSDB_DATA_TYPE_BIGINT == pNode->node.resType.type || TSDB_DATA_TYPE_UBIGINT == pNode->node.resType.type) {
pDbOptions->tablePrefix = taosStr2Int32(pNode->literal, NULL, 10);
} else {
snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, "invalid table_prefix data type");
pCxt->errCode = TSDB_CODE_PAR_SYNTAX_ERROR;
pCxt->errCode = TSDB_CODE_PAR_SYNTAX_ERROR;
}
nodesDestroyNode((SNode*)pNode);
break;
}
case DB_OPTION_TABLE_SUFFIX:{
SValueNode *pNode = (SValueNode *)pVal;
case DB_OPTION_TABLE_SUFFIX: {
SValueNode* pNode = (SValueNode*)pVal;
if (TSDB_DATA_TYPE_BIGINT == pNode->node.resType.type || TSDB_DATA_TYPE_UBIGINT == pNode->node.resType.type) {
pDbOptions->tableSuffix = taosStr2Int32(pNode->literal, NULL, 10);
} else {
snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, "invalid table_suffix data type");
pCxt->errCode = TSDB_CODE_PAR_SYNTAX_ERROR;
pCxt->errCode = TSDB_CODE_PAR_SYNTAX_ERROR;
}
nodesDestroyNode((SNode*)pNode);
break;
......@@ -1947,6 +1947,32 @@ SNode* createDropStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToke
return (SNode*)pStmt;
}
SNode* createPauseStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pStreamName) {
CHECK_PARSER_STATUS(pCxt);
if (!checkStreamName(pCxt, pStreamName)) {
return NULL;
}
SPauseStreamStmt* pStmt = (SPauseStreamStmt*)nodesMakeNode(QUERY_NODE_PAUSE_STREAM_STMT);
CHECK_OUT_OF_MEM(pStmt);
COPY_STRING_FORM_ID_TOKEN(pStmt->streamName, pStreamName);
pStmt->ignoreNotExists = ignoreNotExists;
return (SNode*)pStmt;
}
SNode* createResumeStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, bool ignoreUntreated,
SToken* pStreamName) {
CHECK_PARSER_STATUS(pCxt);
if (!checkStreamName(pCxt, pStreamName)) {
return NULL;
}
SResumeStreamStmt* pStmt = (SResumeStreamStmt*)nodesMakeNode(QUERY_NODE_RESUME_STREAM_STMT);
CHECK_OUT_OF_MEM(pStmt);
COPY_STRING_FORM_ID_TOKEN(pStmt->streamName, pStreamName);
pStmt->ignoreNotExists = ignoreNotExists;
pStmt->ignoreUntreated = ignoreUntreated;
return (SNode*)pStmt;
}
SNode* createKillStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pId) {
CHECK_PARSER_STATUS(pCxt);
SKillStmt* pStmt = (SKillStmt*)nodesMakeNode(type);
......
......@@ -175,12 +175,14 @@ static SKeyword keywordTable[] = {
{"QUERY", TK_QUERY},
{"RANGE", TK_RANGE},
{"RATIO", TK_RATIO},
{"PAUSE", TK_PAUSE},
{"READ", TK_READ},
{"REDISTRIBUTE", TK_REDISTRIBUTE},
{"RENAME", TK_RENAME},
{"REPLACE", TK_REPLACE},
{"REPLICA", TK_REPLICA},
{"RESET", TK_RESET},
{"RESUME", TK_RESUME},
{"RETENTIONS", TK_RETENTIONS},
{"REVOKE", TK_REVOKE},
{"ROLLUP", TK_ROLLUP},
......@@ -239,6 +241,7 @@ static SKeyword keywordTable[] = {
{"TTL", TK_TTL},
{"UNION", TK_UNION},
{"UNSIGNED", TK_UNSIGNED},
{"UNTREATED", TK_UNTREATED},
{"UPDATE", TK_UPDATE},
{"USE", TK_USE},
{"USER", TK_USER},
......
......@@ -4184,7 +4184,6 @@ static int32_t checkDbRetentionsOption(STranslateContext* pCxt, SNodeList* pRete
return TSDB_CODE_SUCCESS;
}
static int32_t checkDbTbPrefixSuffixOptions(STranslateContext* pCxt, int32_t tbPrefix, int32_t tbSuffix) {
if (tbPrefix < TSDB_MIN_HASH_PREFIX || tbPrefix > TSDB_MAX_HASH_PREFIX) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
......@@ -4211,7 +4210,6 @@ static int32_t checkDbTbPrefixSuffixOptions(STranslateContext* pCxt, int32_t tbP
return TSDB_CODE_SUCCESS;
}
static int32_t checkOptionsDependency(STranslateContext* pCxt, const char* pDbName, SDatabaseOptions* pOptions) {
int32_t daysPerFile = pOptions->daysPerFile;
int64_t daysToKeep0 = pOptions->keep[0];
......@@ -6522,6 +6520,25 @@ static int32_t translateDropStream(STranslateContext* pCxt, SDropStreamStmt* pSt
return buildCmdMsg(pCxt, TDMT_MND_DROP_STREAM, (FSerializeFunc)tSerializeSMDropStreamReq, &dropReq);
}
static int32_t translatePauseStream(STranslateContext* pCxt, SPauseStreamStmt* pStmt) {
SMPauseStreamReq req = {0};
SName name;
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->streamName, strlen(pStmt->streamName));
tNameGetFullDbName(&name, req.name);
req.igNotExists = pStmt->ignoreNotExists;
return buildCmdMsg(pCxt, TDMT_MND_PAUSE_STREAM, (FSerializeFunc)tSerializeSMPauseStreamReq, &req);
}
static int32_t translateResumeStream(STranslateContext* pCxt, SResumeStreamStmt* pStmt) {
SMResumeStreamReq req = {0};
SName name;
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->streamName, strlen(pStmt->streamName));
tNameGetFullDbName(&name, req.name);
req.igNotExists = pStmt->ignoreNotExists;
req.igUntreated = pStmt->ignoreUntreated;
return buildCmdMsg(pCxt, TDMT_MND_RESUME_STREAM, (FSerializeFunc)tSerializeSMResumeStreamReq, &req);
}
static int32_t readFromFile(char* pName, int32_t* len, char** buf) {
int64_t filesize = 0;
if (taosStatFile(pName, &filesize, NULL) < 0) {
......@@ -6885,6 +6902,12 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
case QUERY_NODE_DROP_STREAM_STMT:
code = translateDropStream(pCxt, (SDropStreamStmt*)pNode);
break;
case QUERY_NODE_PAUSE_STREAM_STMT:
code = translatePauseStream(pCxt, (SPauseStreamStmt*)pNode);
break;
case QUERY_NODE_RESUME_STREAM_STMT:
code = translateResumeStream(pCxt, (SResumeStreamStmt*)pNode);
break;
case QUERY_NODE_CREATE_FUNCTION_STMT:
code = translateCreateFunction(pCxt, (SCreateFunctionStmt*)pNode);
break;
......
因为 它太大了无法显示 source diff 。你可以改为 查看blob
......@@ -101,6 +101,60 @@ TEST_F(ParserExplainToSyncdbTest, mergeVgroup) {
run("MERGE VGROUP 1 2");
}
TEST_F(ParserExplainToSyncdbTest, pauseStreamStmt) {
useDb("root", "test");
SMPauseStreamReq expect = {0};
auto setMPauseStreamReq = [&](const string& name, bool igNotExists = false) {
snprintf(expect.name, sizeof(expect.name), "0.%s", name.c_str());
expect.igNotExists = igNotExists;
};
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_PAUSE_STREAM_STMT);
ASSERT_EQ(pQuery->pCmdMsg->msgType, TDMT_MND_PAUSE_STREAM);
SMPauseStreamReq req = {0};
ASSERT_EQ(tDeserializeSMPauseStreamReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req), TSDB_CODE_SUCCESS);
ASSERT_EQ(string(req.name), string(expect.name));
ASSERT_EQ(req.igNotExists, expect.igNotExists);
});
setMPauseStreamReq("str1");
run("PAUSE STREAM str1");
setMPauseStreamReq("str2", true);
run("PAUSE STREAM IF EXISTS str2");
}
TEST_F(ParserExplainToSyncdbTest, resumeStreamStmt) {
useDb("root", "test");
SMResumeStreamReq expect = {0};
auto setMResumeStreamReq = [&](const string& name, bool igNotExists = false, bool igUntreated = false) {
snprintf(expect.name, sizeof(expect.name), "0.%s", name.c_str());
expect.igNotExists = igNotExists;
expect.igUntreated = igUntreated;
};
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_RESUME_STREAM_STMT);
ASSERT_EQ(pQuery->pCmdMsg->msgType, TDMT_MND_RESUME_STREAM);
SMResumeStreamReq req = {0};
ASSERT_EQ(tDeserializeSMResumeStreamReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req), TSDB_CODE_SUCCESS);
ASSERT_EQ(string(req.name), string(expect.name));
ASSERT_EQ(req.igNotExists, expect.igNotExists);
ASSERT_EQ(req.igUntreated, expect.igUntreated);
});
setMResumeStreamReq("str1");
run("RESUME STREAM str1");
setMResumeStreamReq("str2", true, true);
run("RESUME STREAM IF EXISTS IGNORE UNTREATED str2");
}
TEST_F(ParserExplainToSyncdbTest, redistributeVgroup) {
useDb("root", "test");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册