提交 1530b339 编写于 作者: X Xiaoyu Wang

feat: support 'select *, expr from ...' syntax

上级 6141bb03
......@@ -1623,6 +1623,7 @@ typedef struct {
int8_t triggerType;
int64_t maxDelay;
int64_t watermark;
int8_t igExpired;
} SCMCreateStreamReq;
typedef struct {
......
......@@ -192,79 +192,81 @@
#define TK_TRIGGER 174
#define TK_AT_ONCE 175
#define TK_WINDOW_CLOSE 176
#define TK_KILL 177
#define TK_CONNECTION 178
#define TK_TRANSACTION 179
#define TK_BALANCE 180
#define TK_VGROUP 181
#define TK_MERGE 182
#define TK_REDISTRIBUTE 183
#define TK_SPLIT 184
#define TK_SYNCDB 185
#define TK_DELETE 186
#define TK_NULL 187
#define TK_NK_QUESTION 188
#define TK_NK_ARROW 189
#define TK_ROWTS 190
#define TK_TBNAME 191
#define TK_QSTARTTS 192
#define TK_QENDTS 193
#define TK_WSTARTTS 194
#define TK_WENDTS 195
#define TK_WDURATION 196
#define TK_CAST 197
#define TK_NOW 198
#define TK_TODAY 199
#define TK_TIMEZONE 200
#define TK_CLIENT_VERSION 201
#define TK_SERVER_VERSION 202
#define TK_SERVER_STATUS 203
#define TK_CURRENT_USER 204
#define TK_COUNT 205
#define TK_LAST_ROW 206
#define TK_BETWEEN 207
#define TK_IS 208
#define TK_NK_LT 209
#define TK_NK_GT 210
#define TK_NK_LE 211
#define TK_NK_GE 212
#define TK_NK_NE 213
#define TK_MATCH 214
#define TK_NMATCH 215
#define TK_CONTAINS 216
#define TK_JOIN 217
#define TK_INNER 218
#define TK_SELECT 219
#define TK_DISTINCT 220
#define TK_WHERE 221
#define TK_PARTITION 222
#define TK_BY 223
#define TK_SESSION 224
#define TK_STATE_WINDOW 225
#define TK_SLIDING 226
#define TK_FILL 227
#define TK_VALUE 228
#define TK_NONE 229
#define TK_PREV 230
#define TK_LINEAR 231
#define TK_NEXT 232
#define TK_HAVING 233
#define TK_RANGE 234
#define TK_EVERY 235
#define TK_ORDER 236
#define TK_SLIMIT 237
#define TK_SOFFSET 238
#define TK_LIMIT 239
#define TK_OFFSET 240
#define TK_ASC 241
#define TK_NULLS 242
#define TK_ID 243
#define TK_NK_BITNOT 244
#define TK_INSERT 245
#define TK_VALUES 246
#define TK_IMPORT 247
#define TK_NK_SEMI 248
#define TK_FILE 249
#define TK_IGNORE 177
#define TK_EXPIRED 178
#define TK_KILL 179
#define TK_CONNECTION 180
#define TK_TRANSACTION 181
#define TK_BALANCE 182
#define TK_VGROUP 183
#define TK_MERGE 184
#define TK_REDISTRIBUTE 185
#define TK_SPLIT 186
#define TK_SYNCDB 187
#define TK_DELETE 188
#define TK_NULL 189
#define TK_NK_QUESTION 190
#define TK_NK_ARROW 191
#define TK_ROWTS 192
#define TK_TBNAME 193
#define TK_QSTARTTS 194
#define TK_QENDTS 195
#define TK_WSTARTTS 196
#define TK_WENDTS 197
#define TK_WDURATION 198
#define TK_CAST 199
#define TK_NOW 200
#define TK_TODAY 201
#define TK_TIMEZONE 202
#define TK_CLIENT_VERSION 203
#define TK_SERVER_VERSION 204
#define TK_SERVER_STATUS 205
#define TK_CURRENT_USER 206
#define TK_COUNT 207
#define TK_LAST_ROW 208
#define TK_BETWEEN 209
#define TK_IS 210
#define TK_NK_LT 211
#define TK_NK_GT 212
#define TK_NK_LE 213
#define TK_NK_GE 214
#define TK_NK_NE 215
#define TK_MATCH 216
#define TK_NMATCH 217
#define TK_CONTAINS 218
#define TK_JOIN 219
#define TK_INNER 220
#define TK_SELECT 221
#define TK_DISTINCT 222
#define TK_WHERE 223
#define TK_PARTITION 224
#define TK_BY 225
#define TK_SESSION 226
#define TK_STATE_WINDOW 227
#define TK_SLIDING 228
#define TK_FILL 229
#define TK_VALUE 230
#define TK_NONE 231
#define TK_PREV 232
#define TK_LINEAR 233
#define TK_NEXT 234
#define TK_HAVING 235
#define TK_RANGE 236
#define TK_EVERY 237
#define TK_ORDER 238
#define TK_SLIMIT 239
#define TK_SOFFSET 240
#define TK_LIMIT 241
#define TK_OFFSET 242
#define TK_ASC 243
#define TK_NULLS 244
#define TK_ID 245
#define TK_NK_BITNOT 246
#define TK_INSERT 247
#define TK_VALUES 248
#define TK_IMPORT 249
#define TK_NK_SEMI 250
#define TK_FILE 251
#define TK_NK_SPACE 300
#define TK_NK_COMMENT 301
......
......@@ -338,6 +338,7 @@ typedef struct SStreamOptions {
int8_t triggerType;
SNode* pDelay;
SNode* pWatermark;
bool ignoreExpired;
} SStreamOptions;
typedef struct SCreateStreamStmt {
......
......@@ -73,8 +73,7 @@ typedef struct SScanLogicNode {
SNode* pTagIndexCond;
int8_t triggerType;
int64_t watermark;
int16_t tsColId;
double filesFactor;
int8_t igExpired;
SArray* pSmaIndexes;
SNodeList* pGroupTags;
bool groupSort;
......@@ -175,7 +174,7 @@ typedef struct SWindowLogicNode {
SNode* pStateExpr;
int8_t triggerType;
int64_t watermark;
double filesFactor;
int8_t igExpired;
EWindowAlgorithm windowAlgo;
} SWindowLogicNode;
......@@ -296,8 +295,7 @@ typedef struct STableScanPhysiNode {
int8_t slidingUnit;
int8_t triggerType;
int64_t watermark;
int16_t tsColId;
double filesFactor;
int8_t igExpired;
} STableScanPhysiNode;
typedef STableScanPhysiNode STableSeqScanPhysiNode;
......@@ -373,7 +371,7 @@ typedef struct SWinodwPhysiNode {
SNode* pTsEnd; // window end timestamp
int8_t triggerType;
int64_t watermark;
double filesFactor;
int8_t igExpired;
} SWinodwPhysiNode;
typedef struct SIntervalPhysiNode {
......
......@@ -34,6 +34,7 @@ typedef struct SPlanContext {
bool showRewrite;
int8_t triggerType;
int64_t watermark;
int8_t igExpired;
char* pMsg;
int32_t msgLen;
const char* pUser;
......
......@@ -4641,6 +4641,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
if (tEncodeI8(&encoder, pReq->triggerType) < 0) return -1;
if (tEncodeI64(&encoder, pReq->maxDelay) < 0) return -1;
if (tEncodeI64(&encoder, pReq->watermark) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igExpired) < 0) return -1;
if (sqlLen > 0 && tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
if (astLen > 0 && tEncodeCStr(&encoder, pReq->ast) < 0) return -1;
......@@ -4668,6 +4669,7 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
if (tDecodeI8(&decoder, &pReq->triggerType) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->maxDelay) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->watermark) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->igExpired) < 0) return -1;
if (sqlLen > 0) {
pReq->sql = taosMemoryCalloc(1, sqlLen + 1);
......@@ -5502,4 +5504,3 @@ int32_t tDecodeSMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
}
return 0;
}
......@@ -1188,7 +1188,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
int16_t colId = id->colId;
taosArrayPush(pColIds, &colId);
if (id->colId == pTableScanNode->tsColId) {
if (id->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
pInfo->primaryTsIndex = id->targetSlotId;
}
}
......
......@@ -350,9 +350,9 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
CLONE_NODE_FIELD(pTagIndexCond);
COPY_SCALAR_FIELD(triggerType);
COPY_SCALAR_FIELD(watermark);
COPY_SCALAR_FIELD(tsColId);
COPY_SCALAR_FIELD(filesFactor);
COPY_SCALAR_FIELD(igExpired);
CLONE_NODE_LIST_FIELD(pGroupTags);
COPY_SCALAR_FIELD(groupSort);
return TSDB_CODE_SUCCESS;
}
......@@ -421,7 +421,7 @@ static int32_t logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* p
CLONE_NODE_FIELD(pStateExpr);
COPY_SCALAR_FIELD(triggerType);
COPY_SCALAR_FIELD(watermark);
COPY_SCALAR_FIELD(filesFactor);
COPY_SCALAR_FIELD(igExpired);
COPY_SCALAR_FIELD(windowAlgo);
return TSDB_CODE_SUCCESS;
}
......@@ -511,8 +511,7 @@ static int32_t physiTableScanCopy(const STableScanPhysiNode* pSrc, STableScanPhy
COPY_SCALAR_FIELD(slidingUnit);
COPY_SCALAR_FIELD(triggerType);
COPY_SCALAR_FIELD(watermark);
COPY_SCALAR_FIELD(tsColId);
COPY_SCALAR_FIELD(filesFactor);
COPY_SCALAR_FIELD(igExpired);
return TSDB_CODE_SUCCESS;
}
......@@ -532,7 +531,7 @@ static int32_t physiWindowCopy(const SWinodwPhysiNode* pSrc, SWinodwPhysiNode* p
CLONE_NODE_FIELD(pTsEnd);
COPY_SCALAR_FIELD(triggerType);
COPY_SCALAR_FIELD(watermark);
COPY_SCALAR_FIELD(filesFactor);
COPY_SCALAR_FIELD(igExpired);
return TSDB_CODE_SUCCESS;
}
......
......@@ -1426,12 +1426,11 @@ static const char* jkTableScanPhysiPlanDynamicScanFuncs = "DynamicScanFuncs";
static const char* jkTableScanPhysiPlanInterval = "Interval";
static const char* jkTableScanPhysiPlanOffset = "Offset";
static const char* jkTableScanPhysiPlanSliding = "Sliding";
static const char* jkTableScanPhysiPlanIntervalUnit = "intervalUnit";
static const char* jkTableScanPhysiPlanSlidingUnit = "slidingUnit";
static const char* jkTableScanPhysiPlanTriggerType = "triggerType";
static const char* jkTableScanPhysiPlanWatermark = "watermark";
static const char* jkTableScanPhysiPlanTsColId = "tsColId";
static const char* jkTableScanPhysiPlanFilesFactor = "FilesFactor";
static const char* jkTableScanPhysiPlanIntervalUnit = "IntervalUnit";
static const char* jkTableScanPhysiPlanSlidingUnit = "SlidingUnit";
static const char* jkTableScanPhysiPlanTriggerType = "TriggerType";
static const char* jkTableScanPhysiPlanWatermark = "Watermark";
static const char* jkTableScanPhysiPlanIgnoreExpired = "IgnoreExpired";
static const char* jkTableScanPhysiPlanGroupTags = "GroupTags";
static const char* jkTableScanPhysiPlanGroupSort = "GroupSort";
......@@ -1482,10 +1481,7 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanWatermark, pNode->watermark);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanTsColId, pNode->tsColId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddDoubleToObject(pJson, jkTableScanPhysiPlanFilesFactor, pNode->filesFactor);
code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanIgnoreExpired, pNode->igExpired);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkTableScanPhysiPlanGroupTags, pNode->pGroupTags);
......@@ -1517,37 +1513,34 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
code = tjsonGetDoubleValue(pJson, jkTableScanPhysiPlanRatio, &pNode->ratio);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanDataRequired, pNode->dataRequired, code);
code = tjsonGetIntValue(pJson, jkTableScanPhysiPlanStartKey, &pNode->dataRequired);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkTableScanPhysiPlanDynamicScanFuncs, &pNode->pDynamicScanFuncs);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanInterval, pNode->interval, code);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanOffset, pNode->offset, code);
code = tjsonGetBigIntValue(pJson, jkTableScanPhysiPlanStartKey, &pNode->interval);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanSliding, pNode->sliding, code);
code = tjsonGetBigIntValue(pJson, jkTableScanPhysiPlanStartKey, &pNode->offset);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanIntervalUnit, pNode->intervalUnit, code);
code = tjsonGetBigIntValue(pJson, jkTableScanPhysiPlanStartKey, &pNode->sliding);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanSlidingUnit, pNode->slidingUnit, code);
code = tjsonGetTinyIntValue(pJson, jkTableScanPhysiPlanStartKey, &pNode->intervalUnit);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanTriggerType, pNode->triggerType, code);
code = tjsonGetTinyIntValue(pJson, jkTableScanPhysiPlanStartKey, &pNode->slidingUnit);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanWatermark, pNode->watermark, code);
code = tjsonGetTinyIntValue(pJson, jkTableScanPhysiPlanStartKey, &pNode->triggerType);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanTsColId, pNode->tsColId, code);
code = tjsonGetBigIntValue(pJson, jkTableScanPhysiPlanStartKey, &pNode->watermark);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetDoubleValue(pJson, jkTableScanPhysiPlanFilesFactor, &pNode->filesFactor);
code = tjsonGetTinyIntValue(pJson, jkTableScanPhysiPlanStartKey, &pNode->igExpired);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkTableScanPhysiPlanGroupTags, &pNode->pGroupTags);
......@@ -1826,7 +1819,7 @@ static const char* jkWindowPhysiPlanTsPk = "TsPk";
static const char* jkWindowPhysiPlanTsEnd = "TsEnd";
static const char* jkWindowPhysiPlanTriggerType = "TriggerType";
static const char* jkWindowPhysiPlanWatermark = "Watermark";
static const char* jkWindowPhysiPlanFilesFactor = "FilesFactor";
static const char* JKWINDOWPHYSIPLANIGNOREEXPIRED = "IgnoreExpired";
static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) {
const SWinodwPhysiNode* pNode = (const SWinodwPhysiNode*)pObj;
......@@ -1851,7 +1844,7 @@ static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) {
code = tjsonAddIntegerToObject(pJson, jkWindowPhysiPlanWatermark, pNode->watermark);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddDoubleToObject(pJson, jkWindowPhysiPlanFilesFactor, pNode->filesFactor);
code = tjsonAddIntegerToObject(pJson, JKWINDOWPHYSIPLANIGNOREEXPIRED, pNode->igExpired);
}
return code;
......@@ -1874,15 +1867,13 @@ static int32_t jsonToPhysiWindowNode(const SJson* pJson, void* pObj) {
code = jsonToNodeObject(pJson, jkWindowPhysiPlanTsEnd, (SNode**)&pNode->pTsEnd);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkWindowPhysiPlanTriggerType, pNode->triggerType, code);
;
code = tjsonGetTinyIntValue(pJson, JKWINDOWPHYSIPLANIGNOREEXPIRED, &pNode->triggerType);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkWindowPhysiPlanWatermark, pNode->watermark, code);
;
code = tjsonGetBigIntValue(pJson, JKWINDOWPHYSIPLANIGNOREEXPIRED, &pNode->watermark);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetDoubleValue(pJson, jkWindowPhysiPlanFilesFactor, &pNode->filesFactor);
code = tjsonGetTinyIntValue(pJson, JKWINDOWPHYSIPLANIGNOREEXPIRED, &pNode->igExpired);
}
return code;
......
......@@ -488,6 +488,7 @@ stream_options(A) ::= stream_options(B) TRIGGER AT_ONCE.
stream_options(A) ::= stream_options(B) TRIGGER WINDOW_CLOSE. { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_WINDOW_CLOSE; A = B; }
stream_options(A) ::= stream_options(B) TRIGGER MAX_DELAY duration_literal(C). { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_MAX_DELAY; ((SStreamOptions*)B)->pDelay = releaseRawExprNode(pCxt, C); A = B; }
stream_options(A) ::= stream_options(B) WATERMARK duration_literal(C). { ((SStreamOptions*)B)->pWatermark = releaseRawExprNode(pCxt, C); A = B; }
stream_options(A) ::= stream_options(B) IGNORE EXPIRED. { ((SStreamOptions*)B)->ignoreExpired = true; A = B; }
/************************************************ kill connection/query ***********************************************/
cmd ::= KILL CONNECTION NK_INTEGER(A). { pCxt->pRootNode = createKillStmt(pCxt, QUERY_NODE_KILL_CONNECTION_STMT, &A); }
......
......@@ -84,6 +84,7 @@ static SKeyword keywordTable[] = {
{"DURATION", TK_DURATION},
{"ENABLE", TK_ENABLE},
{"EXISTS", TK_EXISTS},
{"EXPIRED", TK_EXPIRED},
{"EXPLAIN", TK_EXPLAIN},
{"EVERY", TK_EVERY},
{"FILE", TK_FILE},
......@@ -99,6 +100,7 @@ static SKeyword keywordTable[] = {
{"GROUP", TK_GROUP},
{"HAVING", TK_HAVING},
{"IF", TK_IF},
{"IGNORE", TK_IGNORE},
{"IMPORT", TK_IMPORT},
{"IN", TK_IN},
{"INDEX", TK_INDEX},
......@@ -290,7 +292,6 @@ static SKeyword keywordTable[] = {
// {"END", TK_END},
// {"FAIL", TK_FAIL},
// {"FOR", TK_FOR},
// {"IGNORE", TK_IGNORE},
// {"IMMEDIATE", TK_IMMEDIATE},
// {"INITIALLY", TK_INITIALLY},
// {"INSTEAD", TK_INSTEAD},
......
......@@ -4208,6 +4208,7 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt*
pReq->triggerType = pStmt->pOptions->triggerType;
pReq->maxDelay = (NULL != pStmt->pOptions->pDelay ? ((SValueNode*)pStmt->pOptions->pDelay)->datum.i : 0);
pReq->watermark = (NULL != pStmt->pOptions->pWatermark ? ((SValueNode*)pStmt->pOptions->pWatermark)->datum.i : 0);
pReq->igExpired = pStmt->pOptions->ignoreExpired;
}
return code;
......@@ -4798,6 +4799,15 @@ static const char* getSysTableName(ENodeType type) {
return NULL;
}
static SNode* createStarCol() {
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pCol) {
return NULL;
}
strcpy(pCol->colName, "*");
return (SNode*)pCol;
}
static int32_t createSimpleSelectStmt(const char* pDb, const char* pTable, SSelectStmt** pStmt) {
SSelectStmt* pSelect = (SSelectStmt*)nodesMakeNode(QUERY_NODE_SELECT_STMT);
if (NULL == pSelect) {
......@@ -4815,6 +4825,11 @@ static int32_t createSimpleSelectStmt(const char* pDb, const char* pTable, SSele
strcpy(pRealTable->table.tableAlias, pTable);
pSelect->pFromTable = (SNode*)pRealTable;
if (TSDB_CODE_SUCCESS != nodesListMakeStrictAppend(&pSelect->pProjectionList, createStarCol())) {
nodesDestroyNode((SNode*)pSelect);
return TSDB_CODE_OUT_OF_MEMORY;
}
*pStmt = pSelect;
return TSDB_CODE_SUCCESS;
......@@ -4963,6 +4978,7 @@ static int32_t rewriteShowTableDist(STranslateContext* pCxt, SQuery* pQuery) {
SSelectStmt* pStmt = NULL;
int32_t code = createSelectStmtForShowTableDist((SShowTableDistributedStmt*)pQuery->pRoot, &pStmt);
if (TSDB_CODE_SUCCESS == code) {
NODES_DESTORY_LIST(pStmt->pProjectionList);
code = nodesListMakeStrictAppend(&pStmt->pProjectionList, createBlockDistFunc());
}
if (TSDB_CODE_SUCCESS == code) {
......
此差异已折叠。
......@@ -526,20 +526,22 @@ TEST_F(ParserInitialCTest, createStream) {
memset(&expect, 0, sizeof(SCMCreateStreamReq));
};
auto setCreateStreamReqFunc =
[&](const char* pStream, const char* pSrcDb, const char* pSql, const char* pDstStb = nullptr, int8_t igExists = 0,
int8_t triggerType = STREAM_TRIGGER_AT_ONCE, int64_t maxDelay = 0, int64_t watermark = 0) {
snprintf(expect.name, sizeof(expect.name), "0.%s", pStream);
snprintf(expect.sourceDB, sizeof(expect.sourceDB), "0.%s", pSrcDb);
if (NULL != pDstStb) {
snprintf(expect.targetStbFullName, sizeof(expect.targetStbFullName), "0.test.%s", pDstStb);
}
expect.igExists = igExists;
expect.sql = strdup(pSql);
expect.triggerType = triggerType;
expect.maxDelay = maxDelay;
expect.watermark = watermark;
};
auto setCreateStreamReqFunc = [&](const char* pStream, const char* pSrcDb, const char* pSql,
const char* pDstStb = nullptr, int8_t igExists = 0,
int8_t triggerType = STREAM_TRIGGER_AT_ONCE, int64_t maxDelay = 0,
int64_t watermark = 0, int8_t igExpired = 0) {
snprintf(expect.name, sizeof(expect.name), "0.%s", pStream);
snprintf(expect.sourceDB, sizeof(expect.sourceDB), "0.%s", pSrcDb);
if (NULL != pDstStb) {
snprintf(expect.targetStbFullName, sizeof(expect.targetStbFullName), "0.test.%s", pDstStb);
}
expect.igExists = igExists;
expect.sql = strdup(pSql);
expect.triggerType = triggerType;
expect.maxDelay = maxDelay;
expect.watermark = watermark;
expect.igExpired = igExpired;
};
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_CREATE_STREAM_STMT);
......@@ -555,6 +557,7 @@ TEST_F(ParserInitialCTest, createStream) {
ASSERT_EQ(req.triggerType, expect.triggerType);
ASSERT_EQ(req.maxDelay, expect.maxDelay);
ASSERT_EQ(req.watermark, expect.watermark);
ASSERT_EQ(req.igExpired, expect.igExpired);
tFreeSCMCreateStreamReq(&req);
});
......@@ -571,9 +574,10 @@ TEST_F(ParserInitialCTest, createStream) {
clearCreateStreamReq();
setCreateStreamReqFunc(
"s1", "test", "create stream if not exists s1 trigger max_delay 20s watermark 10s into st1 as select * from t1",
"st1", 1, STREAM_TRIGGER_MAX_DELAY, 20 * MILLISECOND_PER_SECOND, 10 * MILLISECOND_PER_SECOND);
run("CREATE STREAM IF NOT EXISTS s1 TRIGGER MAX_DELAY 20s WATERMARK 10s INTO st1 AS SELECT * FROM t1");
"s1", "test",
"create stream if not exists s1 trigger max_delay 20s watermark 10s ignore expired into st1 as select * from t1",
"st1", 1, STREAM_TRIGGER_MAX_DELAY, 20 * MILLISECOND_PER_SECOND, 10 * MILLISECOND_PER_SECOND, 1);
run("CREATE STREAM IF NOT EXISTS s1 TRIGGER MAX_DELAY 20s WATERMARK 10s IGNORE EXPIRED INTO st1 AS SELECT * FROM t1");
clearCreateStreamReq();
}
......
......@@ -614,10 +614,7 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm
if (pCxt->pPlanCxt->streamQuery) {
pWindow->triggerType = pCxt->pPlanCxt->triggerType;
pWindow->watermark = pCxt->pPlanCxt->watermark;
}
if (pCxt->pPlanCxt->rSmaQuery) {
/*pWindow->filesFactor = pCxt->pPlanCxt->filesFactor;*/
pWindow->igExpired = pCxt->pPlanCxt->igExpired;
}
if (TSDB_CODE_SUCCESS == code) {
......
......@@ -227,8 +227,7 @@ static void scanPathOptSetScanWin(SScanLogicNode* pScan) {
pScan->slidingUnit = ((SWindowLogicNode*)pParent)->slidingUnit;
pScan->triggerType = ((SWindowLogicNode*)pParent)->triggerType;
pScan->watermark = ((SWindowLogicNode*)pParent)->watermark;
pScan->tsColId = ((SColumnNode*)((SWindowLogicNode*)pParent)->pTspk)->colId;
pScan->filesFactor = ((SWindowLogicNode*)pParent)->filesFactor;
pScan->igExpired = ((SWindowLogicNode*)pParent)->igExpired;
}
}
......
......@@ -534,8 +534,7 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
pTableScan->slidingUnit = pScanLogicNode->slidingUnit;
pTableScan->triggerType = pScanLogicNode->triggerType;
pTableScan->watermark = pScanLogicNode->watermark;
pTableScan->tsColId = pScanLogicNode->tsColId;
pTableScan->filesFactor = pScanLogicNode->filesFactor;
pTableScan->igExpired = pScanLogicNode->igExpired;
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode);
}
......@@ -1054,7 +1053,7 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList*
pWindow->triggerType = pWindowLogicNode->triggerType;
pWindow->watermark = pWindowLogicNode->watermark;
pWindow->filesFactor = pWindowLogicNode->filesFactor;
pWindow->igExpired = pWindowLogicNode->igExpired;
if (TSDB_CODE_SUCCESS == code) {
*pPhyNode = (SPhysiNode*)pWindow;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册