提交 0e914a19 编写于 作者: L liuyao

stream operartor checkpoint

上级 5a9461a6
...@@ -169,6 +169,7 @@ typedef enum EStreamType { ...@@ -169,6 +169,7 @@ typedef enum EStreamType {
STREAM_PULL_OVER, STREAM_PULL_OVER,
STREAM_FILL_OVER, STREAM_FILL_OVER,
STREAM_CREATE_CHILD_TABLE, STREAM_CREATE_CHILD_TABLE,
STREAM_CHECKPOINT,
} EStreamType; } EStreamType;
#pragma pack(push, 1) #pragma pack(push, 1)
......
...@@ -183,7 +183,6 @@ extern int32_t tsRpcRetryInterval; ...@@ -183,7 +183,6 @@ extern int32_t tsRpcRetryInterval;
extern bool tsDisableStream; extern bool tsDisableStream;
extern int64_t tsStreamBufferSize; extern int64_t tsStreamBufferSize;
extern int64_t tsCheckpointInterval;
extern bool tsFilterScalarMode; extern bool tsFilterScalarMode;
extern int32_t tsMaxStreamBackendCache; extern int32_t tsMaxStreamBackendCache;
extern int32_t tsPQSortMemThreshold; extern int32_t tsPQSortMemThreshold;
......
...@@ -363,13 +363,13 @@ typedef struct SStateStore { ...@@ -363,13 +363,13 @@ typedef struct SStateStore {
state_key_cmpr_fn fn, void** pVal, int32_t* pVLen); state_key_cmpr_fn fn, void** pVal, int32_t* pVLen);
int32_t (*streamStateSessionGetKeyByRange)(SStreamState* pState, const SSessionKey* range, SSessionKey* curKey); int32_t (*streamStateSessionGetKeyByRange)(SStreamState* pState, const SSessionKey* range, SSessionKey* curKey);
SUpdateInfo* (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark); SUpdateInfo* (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark, bool igUp);
TSKEY (*updateInfoFillBlockData)(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol); TSKEY (*updateInfoFillBlockData)(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol);
bool (*updateInfoIsUpdated)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts); bool (*updateInfoIsUpdated)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts);
bool (*updateInfoIsTableInserted)(SUpdateInfo* pInfo, int64_t tbUid); bool (*updateInfoIsTableInserted)(SUpdateInfo* pInfo, int64_t tbUid);
void (*updateInfoDestroy)(SUpdateInfo* pInfo); void (*updateInfoDestroy)(SUpdateInfo* pInfo);
SUpdateInfo* (*updateInfoInitP)(SInterval* pInterval, int64_t watermark); SUpdateInfo* (*updateInfoInitP)(SInterval* pInterval, int64_t watermark, bool igUp);
void (*updateInfoAddCloseWindowSBF)(SUpdateInfo* pInfo); void (*updateInfoAddCloseWindowSBF)(SUpdateInfo* pInfo);
void (*updateInfoDestoryColseWinSBF)(SUpdateInfo* pInfo); void (*updateInfoDestoryColseWinSBF)(SUpdateInfo* pInfo);
int32_t (*updateInfoSerialize)(void* buf, int32_t bufLen, const SUpdateInfo* pInfo); int32_t (*updateInfoSerialize)(void* buf, int32_t bufLen, const SUpdateInfo* pInfo);
......
...@@ -43,8 +43,8 @@ typedef struct SUpdateKey { ...@@ -43,8 +43,8 @@ typedef struct SUpdateKey {
// uint64_t maxDataVersion; // uint64_t maxDataVersion;
//} SUpdateInfo; //} SUpdateInfo;
SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark); SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark, bool igUp);
SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark); SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, bool igUp);
TSKEY updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol); TSKEY updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol);
bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts); bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts);
bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid); bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid);
......
...@@ -215,7 +215,6 @@ char tsUdfdResFuncs[512] = ""; // udfd resident funcs that teardown when udf ...@@ -215,7 +215,6 @@ char tsUdfdResFuncs[512] = ""; // udfd resident funcs that teardown when udf
char tsUdfdLdLibPath[512] = ""; char tsUdfdLdLibPath[512] = "";
bool tsDisableStream = false; bool tsDisableStream = false;
int64_t tsStreamBufferSize = 128 * 1024 * 1024; int64_t tsStreamBufferSize = 128 * 1024 * 1024;
int64_t tsCheckpointInterval = 3 * 60 * 60 * 1000;
bool tsFilterScalarMode = false; bool tsFilterScalarMode = false;
#ifndef _STORAGE #ifndef _STORAGE
...@@ -532,7 +531,6 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { ...@@ -532,7 +531,6 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "disableStream", tsDisableStream, 0) != 0) return -1; if (cfgAddBool(pCfg, "disableStream", tsDisableStream, 0) != 0) return -1;
if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, 0) != 0) return -1; if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, 0) != 0) return -1;
if (cfgAddInt64(pCfg, "checkpointInterval", tsCheckpointInterval, 0, INT64_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, 0) != 0) return -1; if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, 0) != 0) return -1;
...@@ -918,7 +916,6 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { ...@@ -918,7 +916,6 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsDisableStream = cfgGetItem(pCfg, "disableStream")->bval; tsDisableStream = cfgGetItem(pCfg, "disableStream")->bval;
tsStreamBufferSize = cfgGetItem(pCfg, "streamBufferSize")->i64; tsStreamBufferSize = cfgGetItem(pCfg, "streamBufferSize")->i64;
tsCheckpointInterval = cfgGetItem(pCfg, "checkpointInterval")->i64;
tsFilterScalarMode = cfgGetItem(pCfg, "filterScalarMode")->bval; tsFilterScalarMode = cfgGetItem(pCfg, "filterScalarMode")->bval;
tsMaxStreamBackendCache = cfgGetItem(pCfg, "maxStreamBackendCache")->i32; tsMaxStreamBackendCache = cfgGetItem(pCfg, "maxStreamBackendCache")->i32;
......
...@@ -317,8 +317,6 @@ typedef struct STimeWindowAggSupp { ...@@ -317,8 +317,6 @@ typedef struct STimeWindowAggSupp {
int64_t waterMark; int64_t waterMark;
TSKEY maxTs; TSKEY maxTs;
TSKEY minTs; TSKEY minTs;
TSKEY checkPointTs;
TSKEY checkPointInterval;
SColumnInfoData timeWindowData; // query time window info for scalar function execution. SColumnInfoData timeWindowData; // query time window info for scalar function execution.
} STimeWindowAggSupp; } STimeWindowAggSupp;
...@@ -353,8 +351,6 @@ typedef struct SStreamScanInfo { ...@@ -353,8 +351,6 @@ typedef struct SStreamScanInfo {
SExprSupp* pPartScalarSup; SExprSupp* pPartScalarSup;
bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA. bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA.
int32_t scanWinIndex; // for state operator int32_t scanWinIndex; // for state operator
int32_t pullDataResIndex;
SSDataBlock* pPullDataRes; // pull data SSDataBlock
SSDataBlock* pDeleteDataRes; // delete data SSDataBlock SSDataBlock* pDeleteDataRes; // delete data SSDataBlock
int32_t deleteDataIndex; int32_t deleteDataIndex;
STimeWindow updateWin; STimeWindow updateWin;
...@@ -429,6 +425,11 @@ typedef struct SMergeAlignedIntervalAggOperatorInfo { ...@@ -429,6 +425,11 @@ typedef struct SMergeAlignedIntervalAggOperatorInfo {
SResultRow* pResultRow; SResultRow* pResultRow;
} SMergeAlignedIntervalAggOperatorInfo; } SMergeAlignedIntervalAggOperatorInfo;
typedef struct SOpCheckPointInfo {
uint16_t checkPointId;
SHashObj* children; // key:child id
} SOpCheckPointInfo;
typedef struct SStreamIntervalOperatorInfo { typedef struct SStreamIntervalOperatorInfo {
SOptrBasicInfo binfo; // basic info SOptrBasicInfo binfo; // basic info
SAggSupporter aggSup; // aggregate supporter SAggSupporter aggSup; // aggregate supporter
...@@ -457,9 +458,12 @@ typedef struct SStreamIntervalOperatorInfo { ...@@ -457,9 +458,12 @@ typedef struct SStreamIntervalOperatorInfo {
SArray* pUpdated; SArray* pUpdated;
SSHashObj* pUpdatedMap; SSHashObj* pUpdatedMap;
int64_t dataVersion; int64_t dataVersion;
SStateStore statestore; SStateStore stateStore;
bool recvGetAll; bool recvGetAll;
SHashObj* pFinalPullDataMap; SHashObj* pFinalPullDataMap;
SOpCheckPointInfo checkPointInfo;
bool reCkBlock;
SSDataBlock* pCheckpointRes;
} SStreamIntervalOperatorInfo; } SStreamIntervalOperatorInfo;
typedef struct SDataGroupInfo { typedef struct SDataGroupInfo {
...@@ -493,7 +497,6 @@ typedef struct SStreamSessionAggOperatorInfo { ...@@ -493,7 +497,6 @@ typedef struct SStreamSessionAggOperatorInfo {
STimeWindowAggSupp twAggSup; STimeWindowAggSupp twAggSup;
SSDataBlock* pWinBlock; // window result SSDataBlock* pWinBlock; // window result
SSDataBlock* pDelRes; // delete result SSDataBlock* pDelRes; // delete result
SSDataBlock* pUpdateRes; // update window
bool returnUpdate; bool returnUpdate;
SSHashObj* pStDeleted; SSHashObj* pStDeleted;
void* pDelIterator; void* pDelIterator;
...@@ -507,6 +510,8 @@ typedef struct SStreamSessionAggOperatorInfo { ...@@ -507,6 +510,8 @@ typedef struct SStreamSessionAggOperatorInfo {
int64_t dataVersion; int64_t dataVersion;
SArray* historyWins; SArray* historyWins;
bool isHistoryOp; bool isHistoryOp;
bool reCkBlock;
SSDataBlock* pCheckpointRes;
} SStreamSessionAggOperatorInfo; } SStreamSessionAggOperatorInfo;
typedef struct SStreamStateAggOperatorInfo { typedef struct SStreamStateAggOperatorInfo {
...@@ -528,6 +533,8 @@ typedef struct SStreamStateAggOperatorInfo { ...@@ -528,6 +533,8 @@ typedef struct SStreamStateAggOperatorInfo {
int64_t dataVersion; int64_t dataVersion;
bool isHistoryOp; bool isHistoryOp;
SArray* historyWins; SArray* historyWins;
bool reCkBlock;
SSDataBlock* pCheckpointRes;
} SStreamStateAggOperatorInfo; } SStreamStateAggOperatorInfo;
typedef struct SStreamPartitionOperatorInfo { typedef struct SStreamPartitionOperatorInfo {
...@@ -687,6 +694,9 @@ uint64_t calcGroupId(char* pData, int32_t len); ...@@ -687,6 +694,9 @@ uint64_t calcGroupId(char* pData, int32_t len);
void streamOpReleaseState(struct SOperatorInfo* pOperator); void streamOpReleaseState(struct SOperatorInfo* pOperator);
void streamOpReloadState(struct SOperatorInfo* pOperator); void streamOpReloadState(struct SOperatorInfo* pOperator);
int32_t encodeSTimeWindowAggSupp(void **buf, STimeWindowAggSupp* pTwAggSup);
void* decodeSTimeWindowAggSupp(void *buf, STimeWindowAggSupp* pTwAggSup);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -1343,6 +1343,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { ...@@ -1343,6 +1343,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
memcpy(pInfo->pSrcBlock->info.parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN); memcpy(pInfo->pSrcBlock->info.parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
pInfo->srcRowIndex = -1; pInfo->srcRowIndex = -1;
} break; } break;
case STREAM_CHECKPOINT:
case STREAM_CREATE_CHILD_TABLE: { case STREAM_CREATE_CHILD_TABLE: {
return pBlock; return pBlock;
} break; } break;
......
...@@ -1128,9 +1128,13 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) { ...@@ -1128,9 +1128,13 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
printDataBlock(pInfo->pDelRes, "stream partitionby delete"); printDataBlock(pInfo->pDelRes, "stream partitionby delete");
return pInfo->pDelRes; return pInfo->pDelRes;
} break; } break;
default: case STREAM_CREATE_CHILD_TABLE:
ASSERTS(pBlock->info.type == STREAM_CREATE_CHILD_TABLE || pBlock->info.type == STREAM_RETRIEVE, "invalid SSDataBlock type"); case STREAM_RETRIEVE:
case STREAM_CHECKPOINT: {
return pBlock; return pBlock;
}
default:
ASSERTS(0, "invalid SSDataBlock type");
} }
// there is an scalar expression that needs to be calculated right before apply the group aggregation. // there is an scalar expression that needs to be calculated right before apply the group aggregation.
...@@ -1183,8 +1187,8 @@ void initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup ...@@ -1183,8 +1187,8 @@ void initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup
SStreamScanInfo* pScanInfo = downstream->info; SStreamScanInfo* pScanInfo = downstream->info;
pScanInfo->partitionSup = *pParSup; pScanInfo->partitionSup = *pParSup;
pScanInfo->pPartScalarSup = pExpr; pScanInfo->pPartScalarSup = pExpr;
if (!pScanInfo->igCheckUpdate && !pScanInfo->pUpdateInfo) { if (!pScanInfo->pUpdateInfo) {
pScanInfo->pUpdateInfo = pAPI->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, 0); pScanInfo->pUpdateInfo = pAPI->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, 0, pScanInfo->igCheckUpdate);
} }
} }
......
...@@ -40,11 +40,12 @@ ...@@ -40,11 +40,12 @@
int32_t scanDebug = 0; int32_t scanDebug = 0;
#define MULTI_READER_MAX_TABLE_NUM 5000 #define MULTI_READER_MAX_TABLE_NUM 5000
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN) #define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
#define STREAM_SCAN_OP_NAME "StreamScanOperator" #define STREAM_SCAN_OP_NAME "StreamScanOperator"
#define STREAM_SCAN_OP_STATE_NAME "StreamScanFillHistoryState" #define STREAM_SCAN_OP_STATE_NAME "StreamScanFillHistoryState"
#define STREAM_SCAN_OP_CHECKPOINT_NAME "StreamScanOperator_Checkpoint"
typedef struct STableMergeScanExecInfo { typedef struct STableMergeScanExecInfo {
SFileBlockLoadRecorder blockRecorder; SFileBlockLoadRecorder blockRecorder;
...@@ -1756,21 +1757,33 @@ static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey, SSDataBlock* pBl ...@@ -1756,21 +1757,33 @@ static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey, SSDataBlock* pBl
} }
} }
//int32_t streamScanOperatorEncode(SStreamScanInfo* pInfo, void** pBuff) { int32_t streamScanOperatorEncode(SStreamScanInfo* pInfo, void** pBuff) {
// int32_t len = updateInfoSerialize(NULL, 0, pInfo->pUpdateInfo); int32_t len = pInfo->stateStore.updateInfoSerialize(NULL, 0, pInfo->pUpdateInfo);
// *pBuff = taosMemoryCalloc(1, len); len += encodeSTimeWindowAggSupp(NULL, &pInfo->twAggSup);
// updateInfoSerialize(*pBuff, len, pInfo->pUpdateInfo); *pBuff = taosMemoryCalloc(1, len);
// return len; void* buf = *pBuff;
//} encodeSTimeWindowAggSupp(&buf, &pInfo->twAggSup);
pInfo->stateStore.updateInfoSerialize(buf, len, pInfo->pUpdateInfo);
return len;
}
void streamScanOperatorSaveCheckpoint(SStreamScanInfo* pInfo) {
void* pBuf = NULL;
int32_t len = streamScanOperatorEncode(pInfo, pBuf);
pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_CHECKPOINT_NAME, strlen(STREAM_SCAN_OP_CHECKPOINT_NAME), pBuf, len);
}
// other properties are recovered from the execution plan // other properties are recovered from the execution plan
void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo) { void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo) {
if (!pBuff || len == 0) { if (!pBuff || len == 0) {
return; return;
} }
void* buf = pBuff;
buf = decodeSTimeWindowAggSupp(buf, &pInfo->twAggSup);
int32_t tlen = len - (pBuff - buf);
void* pUpInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo)); void* pUpInfo = pInfo->stateStore.updateInfoInit(0, TSDB_TIME_PRECISION_MILLI, 0, pInfo->igCheckUpdate);
int32_t code = pInfo->stateStore.updateInfoDeserialize(pBuff, len, pUpInfo); int32_t code = pInfo->stateStore.updateInfoDeserialize(buf, tlen, pUpInfo);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
pInfo->pUpdateInfo = pUpInfo; pInfo->pUpdateInfo = pUpInfo;
} }
...@@ -1985,6 +1998,11 @@ FETCH_NEXT_BLOCK: ...@@ -1985,6 +1998,11 @@ FETCH_NEXT_BLOCK:
} }
} }
} break; } break;
case STREAM_CHECKPOINT: {
streamScanOperatorSaveCheckpoint(pInfo);
pAPI->stateStore.streamStateCommit(pInfo->pState);
pAPI->stateStore.streamStateDeleteCheckPoint(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark);
} break;
default: default:
break; break;
} }
...@@ -2317,7 +2335,6 @@ static void destroyStreamScanOperatorInfo(void* param) { ...@@ -2317,7 +2335,6 @@ static void destroyStreamScanOperatorInfo(void* param) {
pStreamScan->stateStore.updateInfoDestroy(pStreamScan->pUpdateInfo); pStreamScan->stateStore.updateInfoDestroy(pStreamScan->pUpdateInfo);
blockDataDestroy(pStreamScan->pRes); blockDataDestroy(pStreamScan->pRes);
blockDataDestroy(pStreamScan->pUpdateRes); blockDataDestroy(pStreamScan->pUpdateRes);
blockDataDestroy(pStreamScan->pPullDataRes);
blockDataDestroy(pStreamScan->pDeleteDataRes); blockDataDestroy(pStreamScan->pDeleteDataRes);
blockDataDestroy(pStreamScan->pUpdateDataRes); blockDataDestroy(pStreamScan->pUpdateDataRes);
blockDataDestroy(pStreamScan->pCreateTbRes); blockDataDestroy(pStreamScan->pCreateTbRes);
...@@ -2526,7 +2543,6 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys ...@@ -2526,7 +2543,6 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN}; pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN};
pInfo->groupId = 0; pInfo->groupId = 0;
pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE);
pInfo->pStreamScanOp = pOperator; pInfo->pStreamScanOp = pOperator;
pInfo->deleteDataIndex = 0; pInfo->deleteDataIndex = 0;
pInfo->pDeleteDataRes = createSpecialDataBlock(STREAM_DELETE_DATA); pInfo->pDeleteDataRes = createSpecialDataBlock(STREAM_DELETE_DATA);
...@@ -2545,9 +2561,11 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys ...@@ -2545,9 +2561,11 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
if (pTaskInfo->streamInfo.pState) { if (pTaskInfo->streamInfo.pState) {
void* buff = NULL; void* buff = NULL;
int32_t len = 0; int32_t len = 0;
pAPI->stateStore.streamStateGetInfo(pTaskInfo->streamInfo.pState, STREAM_SCAN_OP_NAME, strlen(STREAM_SCAN_OP_NAME), &buff, &len); int32_t res = pAPI->stateStore.streamStateGetInfo(pTaskInfo->streamInfo.pState, STREAM_SCAN_OP_CHECKPOINT_NAME, strlen(STREAM_SCAN_OP_CHECKPOINT_NAME), &buff, &len);
streamScanOperatorDecode(buff, len, pInfo); if (res == TSDB_CODE_SUCCESS) {
taosMemoryFree(buff); streamScanOperatorDecode(buff, len, pInfo);
taosMemoryFree(buff);
}
} }
setOperatorInfo(pOperator, STREAM_SCAN_OP_NAME, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo, setOperatorInfo(pOperator, STREAM_SCAN_OP_NAME, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo,
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "streamInc.h" #include "streamInt.h"
int32_t tEncodeSStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { int32_t tEncodeSStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1; if (tStartEncode(pEncoder) < 0) return -1;
......
...@@ -89,11 +89,11 @@ static int64_t adjustWatermark(int64_t adjInterval, int64_t originInt, int64_t w ...@@ -89,11 +89,11 @@ static int64_t adjustWatermark(int64_t adjInterval, int64_t originInt, int64_t w
return watermark; return watermark;
} }
SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark) { SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark, bool igUp) {
return updateInfoInit(pInterval->interval, pInterval->precision, watermark); return updateInfoInit(pInterval->interval, pInterval->precision, watermark, igUp);
} }
SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark) { SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, bool igUp) {
SUpdateInfo *pInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo)); SUpdateInfo *pInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo));
if (pInfo == NULL) { if (pInfo == NULL) {
return NULL; return NULL;
...@@ -104,30 +104,33 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma ...@@ -104,30 +104,33 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma
pInfo->interval = adjustInterval(interval, precision); pInfo->interval = adjustInterval(interval, precision);
pInfo->watermark = adjustWatermark(pInfo->interval, interval, watermark); pInfo->watermark = adjustWatermark(pInfo->interval, interval, watermark);
uint64_t bfSize = (uint64_t)(pInfo->watermark / pInfo->interval); uint64_t bfSize = 0;
if (!igUp) {
bfSize = (uint64_t)(pInfo->watermark / pInfo->interval);
pInfo->pTsSBFs = taosArrayInit(bfSize, sizeof(void *)); pInfo->pTsSBFs = taosArrayInit(bfSize, sizeof(void *));
if (pInfo->pTsSBFs == NULL) { if (pInfo->pTsSBFs == NULL) {
updateInfoDestroy(pInfo); updateInfoDestroy(pInfo);
return NULL; return NULL;
} }
pInfo->numSBFs = bfSize; windowSBfAdd(pInfo, bfSize);
windowSBfAdd(pInfo, bfSize);
pInfo->pTsBuckets = taosArrayInit(DEFAULT_BUCKET_SIZE, sizeof(TSKEY)); pInfo->pTsBuckets = taosArrayInit(DEFAULT_BUCKET_SIZE, sizeof(TSKEY));
if (pInfo->pTsBuckets == NULL) { if (pInfo->pTsBuckets == NULL) {
updateInfoDestroy(pInfo); updateInfoDestroy(pInfo);
return NULL; return NULL;
} }
TSKEY dumy = 0; TSKEY dumy = 0;
for (uint64_t i = 0; i < DEFAULT_BUCKET_SIZE; ++i) { for (uint64_t i = 0; i < DEFAULT_BUCKET_SIZE; ++i) {
taosArrayPush(pInfo->pTsBuckets, &dumy); taosArrayPush(pInfo->pTsBuckets, &dumy);
}
pInfo->numBuckets = DEFAULT_BUCKET_SIZE;
pInfo->pCloseWinSBF = NULL;
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT);
pInfo->pMap = taosHashInit(DEFAULT_MAP_CAPACITY, hashFn, true, HASH_NO_LOCK);
} }
pInfo->numBuckets = DEFAULT_BUCKET_SIZE; pInfo->numSBFs = bfSize;
pInfo->pCloseWinSBF = NULL;
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT);
pInfo->pMap = taosHashInit(DEFAULT_MAP_CAPACITY, hashFn, true, HASH_NO_LOCK);
pInfo->maxDataVersion = 0; pInfo->maxDataVersion = 0;
return pInfo; return pInfo;
} }
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1 -v debugFlag 135
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print =============== create database
sql create database test vgroups 1;
sql select * from information_schema.ins_databases
if $rows != 3 then
return -1
endi
print $data00 $data01 $data02
sql use test;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) from t1 interval(10s);
sql insert into t1 values(1648791213000,1,2,3,1.0);
sql insert into t1 values(1648791213001,2,2,3,1.1);
$loop_count = 0
loop0:
sleep 1000
sql select * from streamt;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
if $rows != 1 then
print =====rows=$rows expect 1
goto loop0
endi
# row 0
if $data01 != 2 then
print =====data01=$data01
goto loop0
endi
if $data02 != 3 then
print =====data02=$data02
goto loop0
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start
sql insert into t1 values(1648791213002,3,2,3,1.1);
$loop_count = 0
loop1:
sleep 1000
sql select * from streamt;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
if $rows != 1 then
print =====rows=$rows expect 1
goto loop1
endi
# row 0
if $data01 != 3 then
print =====data01=$data01
goto loop1
endi
if $data02 != 6 then
print =====data02=$data02
goto loop1
endi
sql insert into t1 values(1648791223002,4,2,3,1.1);
$loop_count = 0
loop2:
sleep 1000
sql select * from streamt;
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
if $rows != 2 then
print =====rows=$rows expect 2
goto loop2
endi
# row 0
if $data01 != 3 then
print =====data01=$data01
goto loop2
endi
if $data02 != 6 then
print =====data02=$data02
goto loop2
endi
# row 1
if $data11 != 1 then
print =====data01=$data01
goto loop2
endi
if $data12 != 4 then
print =====data02=$data02
goto loop2
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册