From 42fcb433dced9b2ab8abe830ea086334027f7804 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Mon, 23 May 2022 11:08:27 +0800 Subject: [PATCH] fix(stream):adjust bloom filter --- source/libs/executor/src/timewindowoperator.c | 1 + source/libs/stream/src/tstreamUpdate.c | 19 +++++++++++++------ source/libs/stream/test/tstreamUpdateTest.cpp | 5 +++-- 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 588c3e90e7..9e073ec05b 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1181,6 +1181,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win); pInfo->invertible = allInvertible(pInfo->binfo.pCtx, numOfCols); + pInfo->invertible = false; // Todo(liuyao): Dependent TSDB API // pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); if (code != TSDB_CODE_SUCCESS /* || pInfo->pTableQueryInfo == NULL*/) { diff --git a/source/libs/stream/src/tstreamUpdate.c b/source/libs/stream/src/tstreamUpdate.c index 7921193fa2..d21dadfe55 100644 --- a/source/libs/stream/src/tstreamUpdate.c +++ b/source/libs/stream/src/tstreamUpdate.c @@ -19,18 +19,24 @@ #define DEFAULT_FALSE_POSITIVE 0.01 #define DEFAULT_BUCKET_SIZE 1024 #define ROWS_PER_MILLISECOND 1 -#define MAX_NUM_SCALABLE_BF 120 +#define MAX_NUM_SCALABLE_BF 100000 #define MIN_NUM_SCALABLE_BF 10 #define DEFAULT_PREADD_BUCKET 1 #define MAX_INTERVAL MILLISECOND_PER_MINUTE #define MIN_INTERVAL (MILLISECOND_PER_SECOND * 10) +#define DEFAULT_EXPECTED_ENTRIES 10000 + +static int64_t adjustExpEntries(int64_t entries) { + return TMIN(DEFAULT_EXPECTED_ENTRIES, entries); +} static void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) { if (pInfo->numSBFs < count) { count = pInfo->numSBFs; } for (uint64_t i = 0; i < count; ++i) { - SScalableBf *tsSBF = tScalableBfInit(pInfo->interval * ROWS_PER_MILLISECOND, DEFAULT_FALSE_POSITIVE); + int64_t rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND); + SScalableBf *tsSBF = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE); taosArrayPush(pInfo->pTsSBFs, &tsSBF); } } @@ -38,9 +44,9 @@ static void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) { static void windowSBfDelete(SUpdateInfo *pInfo, uint64_t count) { if (count < pInfo->numSBFs - 1) { for (uint64_t i = 0; i < count; ++i) { - SScalableBf *pTsSBFs = taosArrayGetP(pInfo->pTsSBFs, i); + SScalableBf *pTsSBFs = taosArrayGetP(pInfo->pTsSBFs, 0); tScalableBfDestroy(pTsSBFs); - taosArrayRemove(pInfo->pTsSBFs, i); + taosArrayRemove(pInfo->pTsSBFs, 0); } } else { taosArrayClearP(pInfo->pTsSBFs, (FDelete)tScalableBfDestroy); @@ -66,7 +72,7 @@ static int64_t adjustInterval(int64_t interval, int32_t precision) { return val; } -static int64_t adjustWatermark(int64_t interval, int32_t watermark) { +static int64_t adjustWatermark(int64_t interval, int64_t watermark) { if (watermark <= 0 || watermark > MAX_NUM_SCALABLE_BF * interval) { watermark = MAX_NUM_SCALABLE_BF * interval; } else if (watermark < MIN_NUM_SCALABLE_BF * interval) { @@ -130,7 +136,8 @@ static SScalableBf *getSBf(SUpdateInfo *pInfo, TSKEY ts) { } SScalableBf *res = taosArrayGetP(pInfo->pTsSBFs, index); if (res == NULL) { - res = tScalableBfInit(pInfo->interval * ROWS_PER_MILLISECOND, DEFAULT_FALSE_POSITIVE); + int64_t rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND); + res = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE); taosArrayPush(pInfo->pTsSBFs, &res); } return res; diff --git a/source/libs/stream/test/tstreamUpdateTest.cpp b/source/libs/stream/test/tstreamUpdateTest.cpp index c1e4e2bec1..93e114db02 100644 --- a/source/libs/stream/test/tstreamUpdateTest.cpp +++ b/source/libs/stream/test/tstreamUpdateTest.cpp @@ -4,6 +4,7 @@ #include "ttime.h" using namespace std; +#define MAX_NUM_SCALABLE_BF 100000 TEST(TD_STREAM_UPDATE_TEST, update) { int64_t interval = 20 * 1000; @@ -91,11 +92,11 @@ TEST(TD_STREAM_UPDATE_TEST, update) { } SUpdateInfo *pSU4 = updateInfoInit(-1, TSDB_TIME_PRECISION_MILLI, -1); - GTEST_ASSERT_EQ(pSU4->watermark, 120 * pSU4->interval); + GTEST_ASSERT_EQ(pSU4->watermark, MAX_NUM_SCALABLE_BF * pSU4->interval); GTEST_ASSERT_EQ(pSU4->interval, MILLISECOND_PER_MINUTE); SUpdateInfo *pSU5 = updateInfoInit(0, TSDB_TIME_PRECISION_MILLI, 0); - GTEST_ASSERT_EQ(pSU5->watermark, 120 * pSU4->interval); + GTEST_ASSERT_EQ(pSU5->watermark, MAX_NUM_SCALABLE_BF * pSU4->interval); GTEST_ASSERT_EQ(pSU5->interval, MILLISECOND_PER_MINUTE); -- GitLab