未验证 提交 4ac19599 编写于 作者: L liuyao 提交者: GitHub

Merge pull request #15240 from taosdata/feature/TD-17357

feat(stream): optimize update data check
...@@ -33,11 +33,12 @@ typedef struct SUpdateInfo { ...@@ -33,11 +33,12 @@ typedef struct SUpdateInfo {
int64_t watermark; int64_t watermark;
TSKEY minTS; TSKEY minTS;
SScalableBf* pCloseWinSBF; SScalableBf* pCloseWinSBF;
SHashObj* pMap;
} SUpdateInfo; } SUpdateInfo;
SUpdateInfo *updateInfoInitP(SInterval* pInterval, int64_t watermark); SUpdateInfo *updateInfoInitP(SInterval* pInterval, int64_t watermark);
SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark); SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark);
bool updateInfoIsUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts); bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts);
void updateInfoDestroy(SUpdateInfo *pInfo); void updateInfoDestroy(SUpdateInfo *pInfo);
void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo); void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo);
void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo); void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo);
......
...@@ -15,9 +15,12 @@ ...@@ -15,9 +15,12 @@
#include "tstreamUpdate.h" #include "tstreamUpdate.h"
#include "ttime.h" #include "ttime.h"
#include "query.h"
#define DEFAULT_FALSE_POSITIVE 0.01 #define DEFAULT_FALSE_POSITIVE 0.01
#define DEFAULT_BUCKET_SIZE 131072 #define DEFAULT_BUCKET_SIZE 1310720
#define DEFAULT_MAP_CAPACITY 1310720
#define DEFAULT_MAP_SIZE (DEFAULT_MAP_CAPACITY * 10)
#define ROWS_PER_MILLISECOND 1 #define ROWS_PER_MILLISECOND 1
#define MAX_NUM_SCALABLE_BF 100000 #define MAX_NUM_SCALABLE_BF 100000
#define MIN_NUM_SCALABLE_BF 10 #define MIN_NUM_SCALABLE_BF 10
...@@ -120,6 +123,8 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma ...@@ -120,6 +123,8 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma
} }
pInfo->numBuckets = DEFAULT_BUCKET_SIZE; pInfo->numBuckets = DEFAULT_BUCKET_SIZE;
pInfo->pCloseWinSBF = NULL; pInfo->pCloseWinSBF = NULL;
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pMap = taosHashInit(DEFAULT_MAP_CAPACITY, hashFn, true, HASH_NO_LOCK);
return pInfo; return pInfo;
} }
...@@ -149,8 +154,9 @@ static SScalableBf *getSBf(SUpdateInfo *pInfo, TSKEY ts) { ...@@ -149,8 +154,9 @@ static SScalableBf *getSBf(SUpdateInfo *pInfo, TSKEY ts) {
return res; return res;
} }
bool updateInfoIsUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts) { bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) {
int32_t res = TSDB_CODE_FAILED; int32_t res = TSDB_CODE_FAILED;
TSKEY* pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t));
uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets; uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets;
TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index); TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index);
if (ts < maxTs - pInfo->watermark) { if (ts < maxTs - pInfo->watermark) {
...@@ -167,7 +173,13 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts) { ...@@ -167,7 +173,13 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts) {
res = tScalableBfPut(pSBf, &ts, sizeof(TSKEY)); res = tScalableBfPut(pSBf, &ts, sizeof(TSKEY));
} }
if (maxTs < ts) { int32_t size = taosHashGetSize(pInfo->pMap);
if ( (!pMapMaxTs && size < DEFAULT_MAP_SIZE) || (pMapMaxTs && *pMapMaxTs < ts)) {
taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), &ts, sizeof(TSKEY));
return false;
}
if ( !pMapMaxTs && maxTs < ts ) {
taosArraySet(pInfo->pTsBuckets, index, &ts); taosArraySet(pInfo->pTsBuckets, index, &ts);
return false; return false;
} }
...@@ -177,7 +189,7 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts) { ...@@ -177,7 +189,7 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts) {
} else if (res == TSDB_CODE_SUCCESS) { } else if (res == TSDB_CODE_SUCCESS) {
return false; return false;
} }
qDebug("===stream===bucket:%d, tableId:%" PRIu64 ", maxTs:" PRIu64 ", maxMapTs:" PRIu64 ", ts:%" PRIu64, index, tableId, maxTs, *pMapMaxTs, ts);
// check from tsdb api // check from tsdb api
return true; return true;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册