提交 ac09a05c 编写于 作者: X Xiaoyu Wang

feat: create stream support delete_mark option

上级 ef12805c
......@@ -1768,6 +1768,7 @@ typedef struct {
SArray* pTags; // array of SField
// 3.0.20
int64_t checkpointFreq; // ms
int64_t deleteMark;
} SCMCreateStreamReq;
typedef struct {
......
......@@ -5424,6 +5424,7 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
if (tEncodeI32(&encoder, pField->bytes) < 0) return -1;
if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
}
if (tEncodeI64(&encoder, pReq->deleteMark) < 0) return -1;
tEndEncode(&encoder);
......@@ -5485,6 +5486,8 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
}
}
if (tDecodeI64(&decoder, &pReq->deleteMark) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
......
......@@ -544,6 +544,7 @@ stream_options(A) ::= stream_options(B) TRIGGER MAX_DELAY duration_literal(C).
stream_options(A) ::= stream_options(B) WATERMARK duration_literal(C). { ((SStreamOptions*)B)->pWatermark = releaseRawExprNode(pCxt, C); A = B; }
stream_options(A) ::= stream_options(B) IGNORE EXPIRED NK_INTEGER(C). { ((SStreamOptions*)B)->ignoreExpired = taosStr2Int8(C.z, NULL, 10); A = B; }
stream_options(A) ::= stream_options(B) FILL_HISTORY NK_INTEGER(C). { ((SStreamOptions*)B)->fillHistory = taosStr2Int8(C.z, NULL, 10); A = B; }
stream_options(A) ::= stream_options(B) DELETE_MARK duration_literal(C). { ((SStreamOptions*)B)->pDeleteMark = releaseRawExprNode(pCxt, C); A = B; }
subtable_opt(A) ::= . { A = NULL; }
subtable_opt(A) ::= SUBTABLE NK_LP expression(B) NK_RP. { A = releaseRawExprNode(pCxt, B); }
......
......@@ -666,6 +666,9 @@ static uint8_t getPrecisionFromCurrStmt(SNode* pCurrStmt, uint8_t defaultVal) {
if (isSetOperator(pCurrStmt)) {
return ((SSetOperator*)pCurrStmt)->precision;
}
if (NULL != pCurrStmt && QUERY_NODE_CREATE_STREAM_STMT == nodeType(pCurrStmt)) {
return getPrecisionFromCurrStmt(((SCreateStreamStmt*)pCurrStmt)->pQuery, defaultVal);
}
return defaultVal;
}
......@@ -5483,16 +5486,6 @@ static bool crossTableWithUdaf(SSelectStmt* pSelect) {
}
static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
if (NULL != pStmt->pOptions->pWatermark &&
(DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pStmt->pOptions->pWatermark))) {
return pCxt->errCode;
}
if (NULL != pStmt->pOptions->pDelay &&
(DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pStmt->pOptions->pDelay))) {
return pCxt->errCode;
}
if (NULL == pStmt->pQuery) {
return TSDB_CODE_SUCCESS;
}
......@@ -5685,6 +5678,17 @@ static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt
return code;
}
static int32_t translateStreamOptions(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
pCxt->pCurrStmt = (SNode*)pStmt;
SStreamOptions* pOptions = pStmt->pOptions;
if ((NULL != pOptions->pWatermark && (DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pOptions->pWatermark))) ||
(NULL != pOptions->pDeleteMark && (DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pOptions->pDeleteMark))) ||
(NULL != pOptions->pDelay && (DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pOptions->pDelay)))) {
return pCxt->errCode;
}
return TSDB_CODE_SUCCESS;
}
static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) {
pReq->igExists = pStmt->ignoreExists;
......@@ -5706,10 +5710,16 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt*
}
}
if (TSDB_CODE_SUCCESS == code) {
code = translateStreamOptions(pCxt, pStmt);
}
if (TSDB_CODE_SUCCESS == code) {
pReq->triggerType = pStmt->pOptions->triggerType;
pReq->maxDelay = (NULL != pStmt->pOptions->pDelay ? ((SValueNode*)pStmt->pOptions->pDelay)->datum.i : 0);
pReq->watermark = (NULL != pStmt->pOptions->pWatermark ? ((SValueNode*)pStmt->pOptions->pWatermark)->datum.i : 0);
pReq->deleteMark =
(NULL != pStmt->pOptions->pDeleteMark ? ((SValueNode*)pStmt->pOptions->pDeleteMark)->datum.i : 0);
pReq->fillHistory = pStmt->pOptions->fillHistory;
pReq->igExpired = pStmt->pOptions->ignoreExpired;
columnDefNodeToField(pStmt->pTags, &pReq->pTags);
......
因为 它太大了无法显示 source diff 。你可以改为 查看blob
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册