未验证 提交 36ef9a53 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #22256 from taosdata/fix/TD-25457

check update data
...@@ -368,6 +368,8 @@ typedef struct SStateStore { ...@@ -368,6 +368,8 @@ typedef struct SStateStore {
bool (*updateInfoIsUpdated)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts); bool (*updateInfoIsUpdated)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts);
bool (*updateInfoIsTableInserted)(SUpdateInfo* pInfo, int64_t tbUid); bool (*updateInfoIsTableInserted)(SUpdateInfo* pInfo, int64_t tbUid);
void (*updateInfoDestroy)(SUpdateInfo* pInfo); void (*updateInfoDestroy)(SUpdateInfo* pInfo);
void (*windowSBfDelete)(SUpdateInfo *pInfo, uint64_t count);
void (*windowSBfAdd)(SUpdateInfo *pInfo, uint64_t count);
SUpdateInfo* (*updateInfoInitP)(SInterval* pInterval, int64_t watermark); SUpdateInfo* (*updateInfoInitP)(SInterval* pInterval, int64_t watermark);
void (*updateInfoAddCloseWindowSBF)(SUpdateInfo* pInfo); void (*updateInfoAddCloseWindowSBF)(SUpdateInfo* pInfo);
......
...@@ -53,6 +53,8 @@ void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo); ...@@ -53,6 +53,8 @@ void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo);
void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo); void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo);
int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo); int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo);
int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo); int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo);
void windowSBfDelete(SUpdateInfo *pInfo, uint64_t count);
void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -78,6 +78,8 @@ void initStateStoreAPI(SStateStore* pStore) { ...@@ -78,6 +78,8 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->updateInfoIsUpdated = updateInfoIsUpdated; pStore->updateInfoIsUpdated = updateInfoIsUpdated;
pStore->updateInfoIsTableInserted = updateInfoIsTableInserted; pStore->updateInfoIsTableInserted = updateInfoIsTableInserted;
pStore->updateInfoDestroy = updateInfoDestroy; pStore->updateInfoDestroy = updateInfoDestroy;
pStore->windowSBfDelete = windowSBfDelete;
pStore->windowSBfAdd = windowSBfAdd;
pStore->updateInfoInitP = updateInfoInitP; pStore->updateInfoInitP = updateInfoInitP;
pStore->updateInfoAddCloseWindowSBF = updateInfoAddCloseWindowSBF; pStore->updateInfoAddCloseWindowSBF = updateInfoAddCloseWindowSBF;
......
...@@ -180,6 +180,8 @@ void initStateStoreAPI(SStateStore* pStore) { ...@@ -180,6 +180,8 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->updateInfoIsUpdated = updateInfoIsUpdated; pStore->updateInfoIsUpdated = updateInfoIsUpdated;
pStore->updateInfoIsTableInserted = updateInfoIsTableInserted; pStore->updateInfoIsTableInserted = updateInfoIsTableInserted;
pStore->updateInfoDestroy = updateInfoDestroy; pStore->updateInfoDestroy = updateInfoDestroy;
pStore->windowSBfDelete = windowSBfDelete;
pStore->windowSBfAdd = windowSBfAdd;
pStore->updateInfoInitP = updateInfoInitP; pStore->updateInfoInitP = updateInfoInitP;
pStore->updateInfoAddCloseWindowSBF = updateInfoAddCloseWindowSBF; pStore->updateInfoAddCloseWindowSBF = updateInfoAddCloseWindowSBF;
......
...@@ -2424,7 +2424,9 @@ void streamScanReloadState(SOperatorInfo* pOperator) { ...@@ -2424,7 +2424,9 @@ void streamScanReloadState(SOperatorInfo* pOperator) {
pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo); pInfo->stateStore.updateInfoDestroy(pInfo->pUpdateInfo);
pInfo->pUpdateInfo = pUpInfo; pInfo->pUpdateInfo = pUpInfo;
} else { } else {
pInfo->pUpdateInfo->minTS = TMAX(pInfo->pUpdateInfo->minTS, pUpInfo->minTS); pInfo->stateStore.windowSBfDelete(pInfo->pUpdateInfo, 1);
pInfo->stateStore.windowSBfAdd(pInfo->pUpdateInfo, 1);
ASSERT(pInfo->pUpdateInfo->minTS > pUpInfo->minTS);
pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pUpInfo->maxDataVersion); pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pUpInfo->maxDataVersion);
SHashObj* curMap = pInfo->pUpdateInfo->pMap; SHashObj* curMap = pInfo->pUpdateInfo->pMap;
void *pIte = taosHashIterate(curMap, NULL); void *pIte = taosHashIterate(curMap, NULL);
......
...@@ -33,7 +33,7 @@ ...@@ -33,7 +33,7 @@
static int64_t adjustExpEntries(int64_t entries) { return TMIN(DEFAULT_EXPECTED_ENTRIES, entries); } static int64_t adjustExpEntries(int64_t entries) { return TMIN(DEFAULT_EXPECTED_ENTRIES, entries); }
static void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) { void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) {
if (pInfo->numSBFs < count) { if (pInfo->numSBFs < count) {
count = pInfo->numSBFs; count = pInfo->numSBFs;
} }
...@@ -49,7 +49,7 @@ static void clearItemHelper(void *p) { ...@@ -49,7 +49,7 @@ static void clearItemHelper(void *p) {
tScalableBfDestroy(*pBf); tScalableBfDestroy(*pBf);
} }
static void windowSBfDelete(SUpdateInfo *pInfo, uint64_t count) { void windowSBfDelete(SUpdateInfo *pInfo, uint64_t count) {
if (count < pInfo->numSBFs) { if (count < pInfo->numSBFs) {
for (uint64_t i = 0; i < count; ++i) { for (uint64_t i = 0; i < count; ++i) {
SScalableBf *pTsSBFs = taosArrayGetP(pInfo->pTsSBFs, 0); SScalableBf *pTsSBFs = taosArrayGetP(pInfo->pTsSBFs, 0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册