未验证 提交 265e14d0 编写于 作者: L liuyao 提交者: GitHub

Merge pull request #12847 from taosdata/feature/TD-15855

fix(stream):adjust bloom filter
...@@ -1181,6 +1181,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -1181,6 +1181,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
pInfo->invertible = allInvertible(pInfo->binfo.pCtx, numOfCols); pInfo->invertible = allInvertible(pInfo->binfo.pCtx, numOfCols);
pInfo->invertible = false; // Todo(liuyao): Dependent TSDB API
// pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); // pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
if (code != TSDB_CODE_SUCCESS /* || pInfo->pTableQueryInfo == NULL*/) { if (code != TSDB_CODE_SUCCESS /* || pInfo->pTableQueryInfo == NULL*/) {
......
...@@ -19,18 +19,24 @@ ...@@ -19,18 +19,24 @@
#define DEFAULT_FALSE_POSITIVE 0.01 #define DEFAULT_FALSE_POSITIVE 0.01
#define DEFAULT_BUCKET_SIZE 1024 #define DEFAULT_BUCKET_SIZE 1024
#define ROWS_PER_MILLISECOND 1 #define ROWS_PER_MILLISECOND 1
#define MAX_NUM_SCALABLE_BF 120 #define MAX_NUM_SCALABLE_BF 100000
#define MIN_NUM_SCALABLE_BF 10 #define MIN_NUM_SCALABLE_BF 10
#define DEFAULT_PREADD_BUCKET 1 #define DEFAULT_PREADD_BUCKET 1
#define MAX_INTERVAL MILLISECOND_PER_MINUTE #define MAX_INTERVAL MILLISECOND_PER_MINUTE
#define MIN_INTERVAL (MILLISECOND_PER_SECOND * 10) #define MIN_INTERVAL (MILLISECOND_PER_SECOND * 10)
#define DEFAULT_EXPECTED_ENTRIES 10000
static int64_t adjustExpEntries(int64_t entries) {
return TMIN(DEFAULT_EXPECTED_ENTRIES, entries);
}
static void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) { static void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) {
if (pInfo->numSBFs < count) { if (pInfo->numSBFs < count) {
count = pInfo->numSBFs; count = pInfo->numSBFs;
} }
for (uint64_t i = 0; i < count; ++i) { for (uint64_t i = 0; i < count; ++i) {
SScalableBf *tsSBF = tScalableBfInit(pInfo->interval * ROWS_PER_MILLISECOND, DEFAULT_FALSE_POSITIVE); int64_t rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND);
SScalableBf *tsSBF = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE);
taosArrayPush(pInfo->pTsSBFs, &tsSBF); taosArrayPush(pInfo->pTsSBFs, &tsSBF);
} }
} }
...@@ -38,9 +44,9 @@ static void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) { ...@@ -38,9 +44,9 @@ static void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) {
static void windowSBfDelete(SUpdateInfo *pInfo, uint64_t count) { static void windowSBfDelete(SUpdateInfo *pInfo, uint64_t count) {
if (count < pInfo->numSBFs - 1) { if (count < pInfo->numSBFs - 1) {
for (uint64_t i = 0; i < count; ++i) { for (uint64_t i = 0; i < count; ++i) {
SScalableBf *pTsSBFs = taosArrayGetP(pInfo->pTsSBFs, i); SScalableBf *pTsSBFs = taosArrayGetP(pInfo->pTsSBFs, 0);
tScalableBfDestroy(pTsSBFs); tScalableBfDestroy(pTsSBFs);
taosArrayRemove(pInfo->pTsSBFs, i); taosArrayRemove(pInfo->pTsSBFs, 0);
} }
} else { } else {
taosArrayClearP(pInfo->pTsSBFs, (FDelete)tScalableBfDestroy); taosArrayClearP(pInfo->pTsSBFs, (FDelete)tScalableBfDestroy);
...@@ -66,7 +72,7 @@ static int64_t adjustInterval(int64_t interval, int32_t precision) { ...@@ -66,7 +72,7 @@ static int64_t adjustInterval(int64_t interval, int32_t precision) {
return val; return val;
} }
static int64_t adjustWatermark(int64_t interval, int32_t watermark) { static int64_t adjustWatermark(int64_t interval, int64_t watermark) {
if (watermark <= 0 || watermark > MAX_NUM_SCALABLE_BF * interval) { if (watermark <= 0 || watermark > MAX_NUM_SCALABLE_BF * interval) {
watermark = MAX_NUM_SCALABLE_BF * interval; watermark = MAX_NUM_SCALABLE_BF * interval;
} else if (watermark < MIN_NUM_SCALABLE_BF * interval) { } else if (watermark < MIN_NUM_SCALABLE_BF * interval) {
...@@ -130,7 +136,8 @@ static SScalableBf *getSBf(SUpdateInfo *pInfo, TSKEY ts) { ...@@ -130,7 +136,8 @@ static SScalableBf *getSBf(SUpdateInfo *pInfo, TSKEY ts) {
} }
SScalableBf *res = taosArrayGetP(pInfo->pTsSBFs, index); SScalableBf *res = taosArrayGetP(pInfo->pTsSBFs, index);
if (res == NULL) { if (res == NULL) {
res = tScalableBfInit(pInfo->interval * ROWS_PER_MILLISECOND, DEFAULT_FALSE_POSITIVE); int64_t rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND);
res = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE);
taosArrayPush(pInfo->pTsSBFs, &res); taosArrayPush(pInfo->pTsSBFs, &res);
} }
return res; return res;
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
#include "ttime.h" #include "ttime.h"
using namespace std; using namespace std;
#define MAX_NUM_SCALABLE_BF 100000
TEST(TD_STREAM_UPDATE_TEST, update) { TEST(TD_STREAM_UPDATE_TEST, update) {
int64_t interval = 20 * 1000; int64_t interval = 20 * 1000;
...@@ -91,11 +92,11 @@ TEST(TD_STREAM_UPDATE_TEST, update) { ...@@ -91,11 +92,11 @@ TEST(TD_STREAM_UPDATE_TEST, update) {
} }
SUpdateInfo *pSU4 = updateInfoInit(-1, TSDB_TIME_PRECISION_MILLI, -1); SUpdateInfo *pSU4 = updateInfoInit(-1, TSDB_TIME_PRECISION_MILLI, -1);
GTEST_ASSERT_EQ(pSU4->watermark, 120 * pSU4->interval); GTEST_ASSERT_EQ(pSU4->watermark, MAX_NUM_SCALABLE_BF * pSU4->interval);
GTEST_ASSERT_EQ(pSU4->interval, MILLISECOND_PER_MINUTE); GTEST_ASSERT_EQ(pSU4->interval, MILLISECOND_PER_MINUTE);
SUpdateInfo *pSU5 = updateInfoInit(0, TSDB_TIME_PRECISION_MILLI, 0); SUpdateInfo *pSU5 = updateInfoInit(0, TSDB_TIME_PRECISION_MILLI, 0);
GTEST_ASSERT_EQ(pSU5->watermark, 120 * pSU4->interval); GTEST_ASSERT_EQ(pSU5->watermark, MAX_NUM_SCALABLE_BF * pSU4->interval);
GTEST_ASSERT_EQ(pSU5->interval, MILLISECOND_PER_MINUTE); GTEST_ASSERT_EQ(pSU5->interval, MILLISECOND_PER_MINUTE);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册