未验证 提交 96561ac3 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #19887 from taosdata/feat/ly_ig_udpate

feat(stream):add ignore check update option for create stream
...@@ -1754,6 +1754,7 @@ typedef struct { ...@@ -1754,6 +1754,7 @@ typedef struct {
#define STREAM_FILL_HISTORY_ON 1 #define STREAM_FILL_HISTORY_ON 1
#define STREAM_FILL_HISTORY_OFF 0 #define STREAM_FILL_HISTORY_OFF 0
#define STREAM_DEFAULT_FILL_HISTORY STREAM_FILL_HISTORY_OFF #define STREAM_DEFAULT_FILL_HISTORY STREAM_FILL_HISTORY_OFF
#define STREAM_DEFAULT_IGNORE_UPDATE 0
typedef struct { typedef struct {
char name[TSDB_STREAM_FNAME_LEN]; char name[TSDB_STREAM_FNAME_LEN];
...@@ -1771,6 +1772,7 @@ typedef struct { ...@@ -1771,6 +1772,7 @@ typedef struct {
SArray* pTags; // array of SField SArray* pTags; // array of SField
// 3.0.20 // 3.0.20
int64_t checkpointFreq; // ms int64_t checkpointFreq; // ms
int8_t igUpdate;
} SCMCreateStreamReq; } SCMCreateStreamReq;
typedef struct { typedef struct {
......
此差异已折叠。
...@@ -389,6 +389,7 @@ typedef struct SStreamOptions { ...@@ -389,6 +389,7 @@ typedef struct SStreamOptions {
SNode* pDeleteMark; SNode* pDeleteMark;
int8_t fillHistory; int8_t fillHistory;
int8_t ignoreExpired; int8_t ignoreExpired;
int8_t ignoreUpdate;
} SStreamOptions; } SStreamOptions;
typedef struct SCreateStreamStmt { typedef struct SCreateStreamStmt {
......
...@@ -93,6 +93,7 @@ typedef struct SScanLogicNode { ...@@ -93,6 +93,7 @@ typedef struct SScanLogicNode {
int64_t watermark; int64_t watermark;
int64_t deleteMark; int64_t deleteMark;
int8_t igExpired; int8_t igExpired;
int8_t igCheckUpdate;
SArray* pSmaIndexes; SArray* pSmaIndexes;
SNodeList* pGroupTags; SNodeList* pGroupTags;
bool groupSort; bool groupSort;
...@@ -217,6 +218,7 @@ typedef struct SWindowLogicNode { ...@@ -217,6 +218,7 @@ typedef struct SWindowLogicNode {
int64_t watermark; int64_t watermark;
int64_t deleteMark; int64_t deleteMark;
int8_t igExpired; int8_t igExpired;
int8_t igCheckUpdate;
EWindowAlgorithm windowAlgo; EWindowAlgorithm windowAlgo;
EOrder inputTsOrder; EOrder inputTsOrder;
EOrder outputTsOrder; EOrder outputTsOrder;
...@@ -357,6 +359,7 @@ typedef struct STableScanPhysiNode { ...@@ -357,6 +359,7 @@ typedef struct STableScanPhysiNode {
int64_t watermark; int64_t watermark;
int8_t igExpired; int8_t igExpired;
bool assignBlockUid; bool assignBlockUid;
int8_t igCheckUpdate;
} STableScanPhysiNode; } STableScanPhysiNode;
typedef STableScanPhysiNode STableSeqScanPhysiNode; typedef STableScanPhysiNode STableSeqScanPhysiNode;
......
...@@ -36,6 +36,7 @@ typedef struct SPlanContext { ...@@ -36,6 +36,7 @@ typedef struct SPlanContext {
int64_t watermark; int64_t watermark;
int64_t deleteMark; int64_t deleteMark;
int8_t igExpired; int8_t igExpired;
int8_t igCheckUpdate;
char* pMsg; char* pMsg;
int32_t msgLen; int32_t msgLen;
const char* pUser; const char* pUser;
......
...@@ -5425,6 +5425,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS ...@@ -5425,6 +5425,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
if (tEncodeI32(&encoder, pField->bytes) < 0) return -1; if (tEncodeI32(&encoder, pField->bytes) < 0) return -1;
if (tEncodeCStr(&encoder, pField->name) < 0) return -1; if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
} }
if (tEncodeI8(&encoder, pReq->igUpdate) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
...@@ -5486,6 +5487,8 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea ...@@ -5486,6 +5487,8 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
} }
} }
if (tDecodeI8(&decoder, &pReq->igUpdate) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
......
...@@ -648,6 +648,7 @@ typedef struct { ...@@ -648,6 +648,7 @@ typedef struct {
int64_t checkpointFreq; // ms int64_t checkpointFreq; // ms
int64_t currentTick; // do not serialize int64_t currentTick; // do not serialize
int64_t deleteMark; int64_t deleteMark;
int8_t igCheckUpdate;
} SStreamObj; } SStreamObj;
int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj); int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj);
......
...@@ -78,6 +78,7 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { ...@@ -78,6 +78,7 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
// 3.0.20 // 3.0.20
if (tEncodeI64(pEncoder, pObj->checkpointFreq) < 0) return -1; if (tEncodeI64(pEncoder, pObj->checkpointFreq) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->igCheckUpdate) < 0) return -1;
tEndEncode(pEncoder); tEndEncode(pEncoder);
return pEncoder->pos; return pEncoder->pos;
...@@ -145,6 +146,7 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) { ...@@ -145,6 +146,7 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
// 3.0.20 // 3.0.20
if (sver >= 2) { if (sver >= 2) {
if (tDecodeI64(pDecoder, &pObj->checkpointFreq) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->checkpointFreq) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->igCheckUpdate) < 0) return -1;
} }
tEndDecode(pDecoder); tEndDecode(pDecoder);
return 0; return 0;
...@@ -489,7 +491,7 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) { ...@@ -489,7 +491,7 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
tlen += tEncodeSMqConsumerEp(buf, pConsumerEp); tlen += tEncodeSMqConsumerEp(buf, pConsumerEp);
cnt++; cnt++;
} }
if(cnt != sz) return -1; if (cnt != sz) return -1;
tlen += taosEncodeArray(buf, pSub->unassignedVgs, (FEncode)tEncodeSMqVgEp); tlen += taosEncodeArray(buf, pSub->unassignedVgs, (FEncode)tEncodeSMqVgEp);
tlen += taosEncodeString(buf, pSub->dbName); tlen += taosEncodeString(buf, pSub->dbName);
return tlen; return tlen;
......
...@@ -297,6 +297,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, ...@@ -297,6 +297,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
pObj->triggerParam = pCreate->maxDelay; pObj->triggerParam = pCreate->maxDelay;
pObj->watermark = pCreate->watermark; pObj->watermark = pCreate->watermark;
pObj->fillHistory = pCreate->fillHistory; pObj->fillHistory = pCreate->fillHistory;
pObj->igCheckUpdate = pCreate->igUpdate;
memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN); memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN);
SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB); SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB);
...@@ -345,6 +346,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, ...@@ -345,6 +346,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
.triggerType = pObj->trigger == STREAM_TRIGGER_MAX_DELAY ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->trigger, .triggerType = pObj->trigger == STREAM_TRIGGER_MAX_DELAY ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->trigger,
.watermark = pObj->watermark, .watermark = pObj->watermark,
.igExpired = pObj->igExpired, .igExpired = pObj->igExpired,
.igCheckUpdate = pObj->igCheckUpdate,
}; };
// using ast and param to build physical plan // using ast and param to build physical plan
......
...@@ -474,6 +474,8 @@ typedef struct SStreamScanInfo { ...@@ -474,6 +474,8 @@ typedef struct SStreamScanInfo {
int32_t blockRecoverContiCnt; int32_t blockRecoverContiCnt;
int32_t blockRecoverTotCnt; int32_t blockRecoverTotCnt;
int8_t igCheckUpdate;
int8_t igExpired;
} SStreamScanInfo; } SStreamScanInfo;
typedef struct { typedef struct {
......
...@@ -1109,7 +1109,7 @@ void initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup ...@@ -1109,7 +1109,7 @@ void initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup
SStreamScanInfo* pScanInfo = downstream->info; SStreamScanInfo* pScanInfo = downstream->info;
pScanInfo->partitionSup = *pParSup; pScanInfo->partitionSup = *pParSup;
pScanInfo->pPartScalarSup = pExpr; pScanInfo->pPartScalarSup = pExpr;
if (!pScanInfo->pUpdateInfo) { if (!pScanInfo->igCheckUpdate && !pScanInfo->pUpdateInfo) {
pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, 0); pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, 0);
} }
} }
......
...@@ -1435,7 +1435,12 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock ...@@ -1435,7 +1435,12 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
dumyInfo.cur.pageId = -1; dumyInfo.cur.pageId = -1;
bool isClosed = false; bool isClosed = false;
STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX}; STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX};
if (tableInserted && isOverdue(tsCol[rowId], &pInfo->twAggSup)) { bool overDue = isOverdue(tsCol[rowId], &pInfo->twAggSup);
if (pInfo->igExpired && overDue) {
continue;
}
if (tableInserted && overDue) {
win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC); win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
isClosed = isCloseWindow(&win, &pInfo->twAggSup); isClosed = isCloseWindow(&win, &pInfo->twAggSup);
} }
...@@ -1701,41 +1706,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1701,41 +1706,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
SStreamScanInfo* pInfo = pOperator->info; SStreamScanInfo* pInfo = pOperator->info;
qDebug("stream scan called"); qDebug("stream scan called");
#if 0
SStreamState* pState = pTaskInfo->streamInfo.pState;
if (pState) {
printf(">>>>>>>> stream write backend\n");
SWinKey key = {
.ts = 1,
.groupId = 2,
};
char tmp[100] = "abcdefg1";
if (streamStatePut(pState, &key, &tmp, strlen(tmp) + 1) < 0) {
ASSERT(0);
}
key.ts = 2;
char tmp2[100] = "abcdefg2";
if (streamStatePut(pState, &key, &tmp2, strlen(tmp2) + 1) < 0) {
ASSERT(0);
}
key.groupId = 5;
key.ts = 1;
char tmp3[100] = "abcdefg3";
if (streamStatePut(pState, &key, &tmp3, strlen(tmp3) + 1) < 0) {
ASSERT(0);
}
char* val2 = NULL;
int32_t sz;
if (streamStateGet(pState, &key, (void**)&val2, &sz) < 0) {
ASSERT(0);
}
printf("stream read %s %d\n", val2, sz);
streamFreeVal(val2);
}
#endif
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 || if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1 ||
pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) { pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) {
...@@ -2368,6 +2338,9 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys ...@@ -2368,6 +2338,9 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->pUpdateDataRes = createSpecialDataBlock(STREAM_CLEAR); pInfo->pUpdateDataRes = createSpecialDataBlock(STREAM_CLEAR);
pInfo->assignBlockUid = pTableScanNode->assignBlockUid; pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
pInfo->partitionSup.needCalc = false; pInfo->partitionSup.needCalc = false;
pInfo->igCheckUpdate = pTableScanNode->igCheckUpdate;
pInfo->igExpired = pTableScanNode->igExpired;
pInfo->twAggSup.maxTs = INT64_MIN;
setOperatorInfo(pOperator, "StreamScanOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo, setOperatorInfo(pOperator, "StreamScanOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo,
pTaskInfo); pTaskInfo);
......
...@@ -1726,7 +1726,7 @@ void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SAggSuppor ...@@ -1726,7 +1726,7 @@ void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SAggSuppor
SStreamScanInfo* pScanInfo = downstream->info; SStreamScanInfo* pScanInfo = downstream->info;
pScanInfo->windowSup.parentType = type; pScanInfo->windowSup.parentType = type;
pScanInfo->windowSup.pIntervalAggSup = pSup; pScanInfo->windowSup.pIntervalAggSup = pSup;
if (!pScanInfo->pUpdateInfo) { if (!pScanInfo->igCheckUpdate && !pScanInfo->pUpdateInfo) {
pScanInfo->pUpdateInfo = updateInfoInitP(pInterval, pTwSup->waterMark); pScanInfo->pUpdateInfo = updateInfoInitP(pInterval, pTwSup->waterMark);
} }
pScanInfo->interval = *pInterval; pScanInfo->interval = *pInterval;
...@@ -2893,7 +2893,7 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uin ...@@ -2893,7 +2893,7 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uin
} }
SStreamScanInfo* pScanInfo = downstream->info; SStreamScanInfo* pScanInfo = downstream->info;
pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type}; pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type};
if (!pScanInfo->pUpdateInfo) { if (!pScanInfo->igCheckUpdate && !pScanInfo->pUpdateInfo) {
pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark); pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark);
} }
pScanInfo->twAggSup = *pTwSup; pScanInfo->twAggSup = *pTwSup;
......
...@@ -380,6 +380,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) { ...@@ -380,6 +380,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
COPY_SCALAR_FIELD(watermark); COPY_SCALAR_FIELD(watermark);
COPY_SCALAR_FIELD(deleteMark); COPY_SCALAR_FIELD(deleteMark);
COPY_SCALAR_FIELD(igExpired); COPY_SCALAR_FIELD(igExpired);
COPY_SCALAR_FIELD(igCheckUpdate);
CLONE_NODE_LIST_FIELD(pGroupTags); CLONE_NODE_LIST_FIELD(pGroupTags);
COPY_SCALAR_FIELD(groupSort); COPY_SCALAR_FIELD(groupSort);
CLONE_NODE_LIST_FIELD(pTags); CLONE_NODE_LIST_FIELD(pTags);
...@@ -467,6 +468,7 @@ static int32_t logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* p ...@@ -467,6 +468,7 @@ static int32_t logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* p
COPY_SCALAR_FIELD(watermark); COPY_SCALAR_FIELD(watermark);
COPY_SCALAR_FIELD(deleteMark); COPY_SCALAR_FIELD(deleteMark);
COPY_SCALAR_FIELD(igExpired); COPY_SCALAR_FIELD(igExpired);
COPY_SCALAR_FIELD(igCheckUpdate);
COPY_SCALAR_FIELD(windowAlgo); COPY_SCALAR_FIELD(windowAlgo);
COPY_SCALAR_FIELD(inputTsOrder); COPY_SCALAR_FIELD(inputTsOrder);
COPY_SCALAR_FIELD(outputTsOrder); COPY_SCALAR_FIELD(outputTsOrder);
......
...@@ -1553,6 +1553,7 @@ static const char* jkTableScanPhysiPlanGroupSort = "GroupSort"; ...@@ -1553,6 +1553,7 @@ static const char* jkTableScanPhysiPlanGroupSort = "GroupSort";
static const char* jkTableScanPhysiPlanTags = "Tags"; static const char* jkTableScanPhysiPlanTags = "Tags";
static const char* jkTableScanPhysiPlanSubtable = "Subtable"; static const char* jkTableScanPhysiPlanSubtable = "Subtable";
static const char* jkTableScanPhysiPlanAssignBlockUid = "AssignBlockUid"; static const char* jkTableScanPhysiPlanAssignBlockUid = "AssignBlockUid";
static const char* jkTableScanPhysiPlanIgnoreUpdate = "IgnoreUpdate";
static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) { static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj; const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj;
...@@ -1618,6 +1619,9 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) { ...@@ -1618,6 +1619,9 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanAssignBlockUid, pNode->assignBlockUid); code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanAssignBlockUid, pNode->assignBlockUid);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanIgnoreUpdate, pNode->igCheckUpdate);
}
return code; return code;
} }
...@@ -1686,6 +1690,9 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) { ...@@ -1686,6 +1690,9 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanAssignBlockUid, &pNode->assignBlockUid); code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanAssignBlockUid, &pNode->assignBlockUid);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkTableScanPhysiPlanIgnoreUpdate, &pNode->igCheckUpdate);
}
return code; return code;
} }
......
...@@ -2078,6 +2078,9 @@ static int32_t physiTableScanNodeInlineToMsg(const void* pObj, STlvEncoder* pEnc ...@@ -2078,6 +2078,9 @@ static int32_t physiTableScanNodeInlineToMsg(const void* pObj, STlvEncoder* pEnc
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueBool(pEncoder, pNode->assignBlockUid); code = tlvEncodeValueBool(pEncoder, pNode->assignBlockUid);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueI8(pEncoder, pNode->igCheckUpdate);
}
return code; return code;
} }
...@@ -2154,6 +2157,9 @@ static int32_t msgToPhysiTableScanNodeInline(STlvDecoder* pDecoder, void* pObj) ...@@ -2154,6 +2157,9 @@ static int32_t msgToPhysiTableScanNodeInline(STlvDecoder* pDecoder, void* pObj)
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueBool(pDecoder, &pNode->assignBlockUid); code = tlvDecodeValueBool(pDecoder, &pNode->assignBlockUid);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueI8(pDecoder, &pNode->igCheckUpdate);
}
return code; return code;
} }
......
...@@ -544,6 +544,7 @@ stream_options(A) ::= stream_options(B) TRIGGER MAX_DELAY duration_literal(C). ...@@ -544,6 +544,7 @@ stream_options(A) ::= stream_options(B) TRIGGER MAX_DELAY duration_literal(C).
stream_options(A) ::= stream_options(B) WATERMARK duration_literal(C). { ((SStreamOptions*)B)->pWatermark = releaseRawExprNode(pCxt, C); A = B; } stream_options(A) ::= stream_options(B) WATERMARK duration_literal(C). { ((SStreamOptions*)B)->pWatermark = releaseRawExprNode(pCxt, C); A = B; }
stream_options(A) ::= stream_options(B) IGNORE EXPIRED NK_INTEGER(C). { ((SStreamOptions*)B)->ignoreExpired = taosStr2Int8(C.z, NULL, 10); A = B; } stream_options(A) ::= stream_options(B) IGNORE EXPIRED NK_INTEGER(C). { ((SStreamOptions*)B)->ignoreExpired = taosStr2Int8(C.z, NULL, 10); A = B; }
stream_options(A) ::= stream_options(B) FILL_HISTORY NK_INTEGER(C). { ((SStreamOptions*)B)->fillHistory = taosStr2Int8(C.z, NULL, 10); A = B; } stream_options(A) ::= stream_options(B) FILL_HISTORY NK_INTEGER(C). { ((SStreamOptions*)B)->fillHistory = taosStr2Int8(C.z, NULL, 10); A = B; }
stream_options(A) ::= stream_options(B) IGNORE UPDATE NK_INTEGER(C). { ((SStreamOptions*)B)->ignoreUpdate = taosStr2Int8(C.z, NULL, 10); A = B; }
subtable_opt(A) ::= . { A = NULL; } subtable_opt(A) ::= . { A = NULL; }
subtable_opt(A) ::= SUBTABLE NK_LP expression(B) NK_RP. { A = releaseRawExprNode(pCxt, B); } subtable_opt(A) ::= SUBTABLE NK_LP expression(B) NK_RP. { A = releaseRawExprNode(pCxt, B); }
...@@ -1077,4 +1078,4 @@ null_ordering_opt(A) ::= NULLS LAST. ...@@ -1077,4 +1078,4 @@ null_ordering_opt(A) ::= NULLS LAST.
%fallback ABORT AFTER ATTACH BEFORE BEGIN BITAND BITNOT BITOR BLOCKS CHANGE COMMA COMPACT CONCAT CONFLICT COPY DEFERRED DELIMITERS DETACH DIVIDE DOT EACH END FAIL %fallback ABORT AFTER ATTACH BEFORE BEGIN BITAND BITNOT BITOR BLOCKS CHANGE COMMA COMPACT CONCAT CONFLICT COPY DEFERRED DELIMITERS DETACH DIVIDE DOT EACH END FAIL
FILE FOR GLOB ID IMMEDIATE IMPORT INITIALLY INSTEAD ISNULL KEY MODULES NK_BITNOT NK_SEMI NOTNULL OF PLUS PRIVILEGE RAISE REPLACE RESTRICT ROW SEMI STAR STATEMENT FILE FOR GLOB ID IMMEDIATE IMPORT INITIALLY INSTEAD ISNULL KEY MODULES NK_BITNOT NK_SEMI NOTNULL OF PLUS PRIVILEGE RAISE REPLACE RESTRICT ROW SEMI STAR STATEMENT
STRICT STRING TIMES UPDATE VALUES VARIABLE VIEW WAL. STRICT STRING TIMES VALUES VARIABLE VIEW WAL.
...@@ -1763,6 +1763,7 @@ SNode* createStreamOptions(SAstCreateContext* pCxt) { ...@@ -1763,6 +1763,7 @@ SNode* createStreamOptions(SAstCreateContext* pCxt) {
pOptions->triggerType = STREAM_TRIGGER_AT_ONCE; pOptions->triggerType = STREAM_TRIGGER_AT_ONCE;
pOptions->fillHistory = STREAM_DEFAULT_FILL_HISTORY; pOptions->fillHistory = STREAM_DEFAULT_FILL_HISTORY;
pOptions->ignoreExpired = STREAM_DEFAULT_IGNORE_EXPIRED; pOptions->ignoreExpired = STREAM_DEFAULT_IGNORE_EXPIRED;
pOptions->ignoreUpdate = STREAM_DEFAULT_IGNORE_UPDATE;
return (SNode*)pOptions; return (SNode*)pOptions;
} }
......
...@@ -233,6 +233,7 @@ static SKeyword keywordTable[] = { ...@@ -233,6 +233,7 @@ static SKeyword keywordTable[] = {
{"TTL", TK_TTL}, {"TTL", TK_TTL},
{"UNION", TK_UNION}, {"UNION", TK_UNION},
{"UNSIGNED", TK_UNSIGNED}, {"UNSIGNED", TK_UNSIGNED},
{"UPDATE", TK_UPDATE},
{"USE", TK_USE}, {"USE", TK_USE},
{"USER", TK_USER}, {"USER", TK_USER},
{"USERS", TK_USERS}, {"USERS", TK_USERS},
......
...@@ -5741,6 +5741,7 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt* ...@@ -5741,6 +5741,7 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt*
pReq->watermark = (NULL != pStmt->pOptions->pWatermark ? ((SValueNode*)pStmt->pOptions->pWatermark)->datum.i : 0); pReq->watermark = (NULL != pStmt->pOptions->pWatermark ? ((SValueNode*)pStmt->pOptions->pWatermark)->datum.i : 0);
pReq->fillHistory = pStmt->pOptions->fillHistory; pReq->fillHistory = pStmt->pOptions->fillHistory;
pReq->igExpired = pStmt->pOptions->ignoreExpired; pReq->igExpired = pStmt->pOptions->ignoreExpired;
pReq->igUpdate = pStmt->pOptions->ignoreUpdate;
columnDefNodeToField(pStmt->pTags, &pReq->pTags); columnDefNodeToField(pStmt->pTags, &pReq->pTags);
pReq->numOfTags = LIST_LENGTH(pStmt->pTags); pReq->numOfTags = LIST_LENGTH(pStmt->pTags);
} }
......
此差异已折叠。
...@@ -643,7 +643,8 @@ TEST_F(ParserInitialCTest, createStream) { ...@@ -643,7 +643,8 @@ TEST_F(ParserInitialCTest, createStream) {
auto setCreateStreamReq = [&](const char* pStream, const char* pSrcDb, const char* pSql, const char* pDstStb, auto setCreateStreamReq = [&](const char* pStream, const char* pSrcDb, const char* pSql, const char* pDstStb,
int8_t igExists = 0, int8_t triggerType = STREAM_TRIGGER_AT_ONCE, int64_t maxDelay = 0, int8_t igExists = 0, int8_t triggerType = STREAM_TRIGGER_AT_ONCE, int64_t maxDelay = 0,
int64_t watermark = 0, int8_t igExpired = STREAM_DEFAULT_IGNORE_EXPIRED, int64_t watermark = 0, int8_t igExpired = STREAM_DEFAULT_IGNORE_EXPIRED,
int8_t fillHistory = STREAM_DEFAULT_FILL_HISTORY) { int8_t fillHistory = STREAM_DEFAULT_FILL_HISTORY,
int8_t igUpdate = STREAM_DEFAULT_IGNORE_UPDATE) {
snprintf(expect.name, sizeof(expect.name), "0.%s", pStream); snprintf(expect.name, sizeof(expect.name), "0.%s", pStream);
snprintf(expect.sourceDB, sizeof(expect.sourceDB), "0.%s", pSrcDb); snprintf(expect.sourceDB, sizeof(expect.sourceDB), "0.%s", pSrcDb);
snprintf(expect.targetStbFullName, sizeof(expect.targetStbFullName), "0.test.%s", pDstStb); snprintf(expect.targetStbFullName, sizeof(expect.targetStbFullName), "0.test.%s", pDstStb);
...@@ -654,6 +655,7 @@ TEST_F(ParserInitialCTest, createStream) { ...@@ -654,6 +655,7 @@ TEST_F(ParserInitialCTest, createStream) {
expect.watermark = watermark; expect.watermark = watermark;
expect.fillHistory = fillHistory; expect.fillHistory = fillHistory;
expect.igExpired = igExpired; expect.igExpired = igExpired;
expect.igUpdate = igUpdate;
}; };
auto addTag = [&](const char* pFieldName, uint8_t type, int32_t bytes = 0) { auto addTag = [&](const char* pFieldName, uint8_t type, int32_t bytes = 0) {
...@@ -699,6 +701,7 @@ TEST_F(ParserInitialCTest, createStream) { ...@@ -699,6 +701,7 @@ TEST_F(ParserInitialCTest, createStream) {
ASSERT_EQ(pField->flags, pExpectField->flags); ASSERT_EQ(pField->flags, pExpectField->flags);
} }
} }
ASSERT_EQ(req.igUpdate, expect.igUpdate);
tFreeSCMCreateStreamReq(&req); tFreeSCMCreateStreamReq(&req);
}); });
...@@ -708,12 +711,11 @@ TEST_F(ParserInitialCTest, createStream) { ...@@ -708,12 +711,11 @@ TEST_F(ParserInitialCTest, createStream) {
setCreateStreamReq( setCreateStreamReq(
"s1", "test", "s1", "test",
"create stream if not exists s1 trigger max_delay 20s watermark 10s ignore expired 0 fill_history 1 into st1 " "create stream if not exists s1 trigger max_delay 20s watermark 10s ignore expired 0 fill_history 1 ignore "
"as select count(*) from t1 interval(10s)", "update 1 into st1 as select count(*) from t1 interval(10s)",
"st1", 1, STREAM_TRIGGER_MAX_DELAY, 20 * MILLISECOND_PER_SECOND, 10 * MILLISECOND_PER_SECOND, 0, 1); "st1", 1, STREAM_TRIGGER_MAX_DELAY, 20 * MILLISECOND_PER_SECOND, 10 * MILLISECOND_PER_SECOND, 0, 1, 1);
run("CREATE STREAM IF NOT EXISTS s1 TRIGGER MAX_DELAY 20s WATERMARK 10s IGNORE EXPIRED 0 FILL_HISTORY 1 INTO st1 AS " run("CREATE STREAM IF NOT EXISTS s1 TRIGGER MAX_DELAY 20s WATERMARK 10s IGNORE EXPIRED 0 FILL_HISTORY 1 IGNORE "
"SELECT COUNT(*) " "UPDATE 1 INTO st1 AS SELECT COUNT(*) FROM t1 INTERVAL(10S)");
"FROM t1 INTERVAL(10S)");
clearCreateStreamReq(); clearCreateStreamReq();
setCreateStreamReq("s1", "test", setCreateStreamReq("s1", "test",
......
...@@ -343,6 +343,13 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect ...@@ -343,6 +343,13 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
pScan->node.groupAction = GROUP_ACTION_NONE; pScan->node.groupAction = GROUP_ACTION_NONE;
pScan->node.resultDataOrder = DATA_ORDER_LEVEL_IN_BLOCK; pScan->node.resultDataOrder = DATA_ORDER_LEVEL_IN_BLOCK;
if (pCxt->pPlanCxt->streamQuery) {
pScan->triggerType = pCxt->pPlanCxt->triggerType;
pScan->watermark = pCxt->pPlanCxt->watermark;
pScan->deleteMark = pCxt->pPlanCxt->deleteMark;
pScan->igExpired = pCxt->pPlanCxt->igExpired;
pScan->igCheckUpdate = pCxt->pPlanCxt->igCheckUpdate;
}
// set columns to scan // set columns to scan
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
...@@ -705,6 +712,7 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm ...@@ -705,6 +712,7 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm
pWindow->watermark = pCxt->pPlanCxt->watermark; pWindow->watermark = pCxt->pPlanCxt->watermark;
pWindow->deleteMark = pCxt->pPlanCxt->deleteMark; pWindow->deleteMark = pCxt->pPlanCxt->deleteMark;
pWindow->igExpired = pCxt->pPlanCxt->igExpired; pWindow->igExpired = pCxt->pPlanCxt->igExpired;
pWindow->igCheckUpdate = pCxt->pPlanCxt->igCheckUpdate;
} }
pWindow->inputTsOrder = ORDER_ASC; pWindow->inputTsOrder = ORDER_ASC;
pWindow->outputTsOrder = ORDER_ASC; pWindow->outputTsOrder = ORDER_ASC;
......
...@@ -328,10 +328,6 @@ static void scanPathOptSetScanWin(SScanLogicNode* pScan) { ...@@ -328,10 +328,6 @@ static void scanPathOptSetScanWin(SScanLogicNode* pScan) {
pScan->sliding = ((SWindowLogicNode*)pParent)->sliding; pScan->sliding = ((SWindowLogicNode*)pParent)->sliding;
pScan->intervalUnit = ((SWindowLogicNode*)pParent)->intervalUnit; pScan->intervalUnit = ((SWindowLogicNode*)pParent)->intervalUnit;
pScan->slidingUnit = ((SWindowLogicNode*)pParent)->slidingUnit; pScan->slidingUnit = ((SWindowLogicNode*)pParent)->slidingUnit;
pScan->triggerType = ((SWindowLogicNode*)pParent)->triggerType;
pScan->watermark = ((SWindowLogicNode*)pParent)->watermark;
pScan->deleteMark = ((SWindowLogicNode*)pParent)->deleteMark;
pScan->igExpired = ((SWindowLogicNode*)pParent)->igExpired;
} }
} }
......
...@@ -582,6 +582,7 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp ...@@ -582,6 +582,7 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
pTableScan->triggerType = pScanLogicNode->triggerType; pTableScan->triggerType = pScanLogicNode->triggerType;
pTableScan->watermark = pScanLogicNode->watermark; pTableScan->watermark = pScanLogicNode->watermark;
pTableScan->igExpired = pScanLogicNode->igExpired; pTableScan->igExpired = pScanLogicNode->igExpired;
pTableScan->igCheckUpdate = pScanLogicNode->igCheckUpdate;
pTableScan->assignBlockUid = pCxt->pPlanCxt->rSmaQuery ? true : false; pTableScan->assignBlockUid = pCxt->pPlanCxt->rSmaQuery ? true : false;
int32_t code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode); int32_t code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode);
......
...@@ -81,6 +81,15 @@ if $rows == 0 then ...@@ -81,6 +81,15 @@ if $rows == 0 then
return -1 return -1
endi endi
sleep 3000
sql drop stream if exists streamd1;
sql drop stream if exists streamd2;
sql drop stream if exists streamd3;
sql drop stream if exists streamd4;
sql drop stream if exists streamd5;
sql drop stream if exists streamd6;
sql create stream streamd10 into streamd10 as select _wstart, _wend, count(*), first(ca), last(cb) as c2 from t1 interval(10s); sql create stream streamd10 into streamd10 as select _wstart, _wend, count(*), first(ca), last(cb) as c2 from t1 interval(10s);
sql desc streamd10; sql desc streamd10;
...@@ -100,15 +109,6 @@ if $rows == 0 then ...@@ -100,15 +109,6 @@ if $rows == 0 then
return -1 return -1
endi endi
sleep 3000
sql drop stream if exists streamd1;
sql drop stream if exists streamd2;
sql drop stream if exists streamd3;
sql drop stream if exists streamd4;
sql drop stream if exists streamd5;
sql drop stream if exists streamd6;
_OVER: _OVER:
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print step 1 start
sql drop stream if exists streams0;
sql drop database if exists test;
sql create database test vgroups 1;
sql use test;
sql create table t1(ts timestamp, a int, b int , c int);
print create stream streams0 trigger at_once ignore update 1 into streamt as select _wstart c1, count(*) c2, max(b) c3 from t1 interval(10s);
sql create stream streams0 trigger at_once ignore update 1 into streamt as select _wstart c1, count(*) c2, max(b) c3 from t1 interval(10s);
sql insert into t1 values(1648791213000,1,1,1);
sql insert into t1 values(1648791213000,2,2,2);
$loop_count = 0
loop0:
sleep 300
sql select * from streamt order by 1,2,3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 2 then
print =====data01=$data01
goto loop0
endi
if $data02 != 2 then
print =====data02=$data02
goto loop0
endi
sql insert into t1 values(1648791213000,3,3,3);
$loop_count = 0
loop1:
sleep 300
sql select * from streamt order by 1,2,3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 3 then
print =====data01=$data01
goto loop1
endi
if $data02 != 3 then
print =====data02=$data02
goto loop1
endi
print step 1 end
print step 2 start
sql drop stream if exists streams1;
sql drop database if exists test1;
sql create database test1 vgroups 1;
sql use test1;
sql create table t1(ts timestamp, a int, b int , c int);
print create stream streams1 trigger at_once ignore update 1 into streamt1 as select _wstart c1, count(*) c2, max(b) c3 from t1 session(ts, 10s);
sql create stream streams1 trigger at_once ignore update 1 into streamt1 as select _wstart c1, count(*) c2, max(b) c3 from t1 session(ts, 10s);
sql insert into t1 values(1648791213000,1,1,1);
sql insert into t1 values(1648791213000,2,2,2);
$loop_count = 0
loop2:
sleep 300
sql select * from streamt1 order by 1,2,3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 2 then
print =====data01=$data01
goto loop2
endi
if $data02 != 2 then
print =====data02=$data02
goto loop2
endi
sql insert into t1 values(1648791213000,3,3,3);
$loop_count = 0
loop3:
sleep 300
sql select * from streamt1 order by 1,2,3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 3 then
print =====data01=$data01
goto loop3
endi
if $data02 != 3 then
print =====data02=$data02
goto loop3
endi
print step 2 end
print step 3 start
sql drop stream if exists streams2;
sql drop database if exists test2;
sql create database test2 vgroups 1;
sql use test2;
sql create table t1(ts timestamp, a int, b int , c int);
print create stream streams2 trigger at_once ignore update 1 into streamt2 as select _wstart c1, count(*) c2, max(b) c3 from t1 state_window(c);
sql create stream streams2 trigger at_once ignore update 1 into streamt2 as select _wstart c1, count(*) c2, max(b) c3 from t1 state_window(c);
sql insert into t1 values(1648791213000,1,1,1);
sql insert into t1 values(1648791213000,2,2,1);
$loop_count = 0
loop2:
sleep 300
sql select * from streamt2 order by 1,2,3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 2 then
print =====data01=$data01
goto loop2
endi
if $data02 != 2 then
print =====data02=$data02
goto loop2
endi
sql insert into t1 values(1648791213000,3,3,1);
$loop_count = 0
loop3:
sleep 300
sql select * from streamt2 order by 1,2,3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 3 then
print =====data01=$data01
goto loop3
endi
if $data02 != 3 then
print =====data02=$data02
goto loop3
endi
print step 3 end
print step 4 start
sql drop stream if exists streams3;
sql drop database if exists test3;
sql create database test3 vgroups 4;
sql use test3;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
print create stream streams3 trigger at_once ignore update 1 into streamt3 as select _wstart c1, count(*) c2, max(b) c3 from st interval(10s);
sql create stream streams3 trigger at_once ignore update 1 into streamt3 as select _wstart c1, count(*) c2, max(b) c3 from st interval(10s);
sql insert into t1 values(1648791213000,1,1,1);
sql insert into t1 values(1648791213000,2,2,2);
sql insert into t2 values(1648791213000,1,1,1);
sql insert into t2 values(1648791213000,2,2,2);
$loop_count = 0
loop0:
sleep 300
sql select * from streamt3 order by 1,2,3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 4 then
print =====data01=$data01
goto loop0
endi
if $data02 != 2 then
print =====data02=$data02
goto loop0
endi
sql insert into t1 values(1648791213000,3,3,3);
$loop_count = 0
loop1:
sleep 300
sql select * from streamt3 order by 1,2,3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 5 then
print =====data01=$data01
goto loop1
endi
if $data02 != 3 then
print =====data02=$data02
goto loop1
endi
sql insert into t2 values(1648791213000,4,4,4);
$loop_count = 0
loop1:
sleep 300
sql select * from streamt3 order by 1,2,3;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 6 then
print =====data01=$data01
goto loop1
endi
if $data02 != 4 then
print =====data02=$data02
goto loop1
endi
print step 4 end
system sh/stop_dnodes.sh
...@@ -52,6 +52,7 @@ sql insert into t1 values(1648791213000,1,2,3,1.0); ...@@ -52,6 +52,7 @@ sql insert into t1 values(1648791213000,1,2,3,1.0);
sql insert into t1 values(1648791223001,1,2,3,1.1); sql insert into t1 values(1648791223001,1,2,3,1.1);
sql insert into t1 values(1648791233002,2,2,3,2.1); sql insert into t1 values(1648791233002,2,2,3,2.1);
sql insert into t1 values(1648791243003,2,2,3,3.1); sql insert into t1 values(1648791243003,2,2,3,3.1);
sleep 300
sql insert into t1 values(1648791200000,4,2,3,4.1); sql insert into t1 values(1648791200000,4,2,3,4.1);
$loop_count = 0 $loop_count = 0
...@@ -115,6 +116,7 @@ sql create stream stream_t1 trigger at_once IGNORE EXPIRED 1 into streamtST1 as ...@@ -115,6 +116,7 @@ sql create stream stream_t1 trigger at_once IGNORE EXPIRED 1 into streamtST1 as
sql create stream stream_t2 trigger at_once IGNORE EXPIRED 1 into streamtST2 as select _wstart, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6 from st session(ts, 10s) ; sql create stream stream_t2 trigger at_once IGNORE EXPIRED 1 into streamtST2 as select _wstart, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6 from st session(ts, 10s) ;
sql insert into ts1 values(1648791211000,1,2,3); sql insert into ts1 values(1648791211000,1,2,3);
sql insert into ts1 values(1648791222001,2,2,3); sql insert into ts1 values(1648791222001,2,2,3);
sleep 300
sql insert into ts2 values(1648791211000,1,2,3); sql insert into ts2 values(1648791211000,1,2,3);
sql insert into ts2 values(1648791222001,2,2,3); sql insert into ts2 values(1648791222001,2,2,3);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册