未验证 提交 55fd6ade 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #12808 from taosdata/feature/stream

fix: memory error
...@@ -17,21 +17,20 @@ ...@@ -17,21 +17,20 @@
#include "ttime.h" #include "ttime.h"
#define DEFAULT_FALSE_POSITIVE 0.01 #define DEFAULT_FALSE_POSITIVE 0.01
#define DEFAULT_BUCKET_SIZE 1024 #define DEFAULT_BUCKET_SIZE 1024
#define ROWS_PER_MILLISECOND 1 #define ROWS_PER_MILLISECOND 1
#define MAX_NUM_SCALABLE_BF 120 #define MAX_NUM_SCALABLE_BF 120
#define MIN_NUM_SCALABLE_BF 10 #define MIN_NUM_SCALABLE_BF 10
#define DEFAULT_PREADD_BUCKET 1 #define DEFAULT_PREADD_BUCKET 1
#define MAX_INTERVAL MILLISECOND_PER_MINUTE #define MAX_INTERVAL MILLISECOND_PER_MINUTE
#define MIN_INTERVAL (MILLISECOND_PER_SECOND * 10) #define MIN_INTERVAL (MILLISECOND_PER_SECOND * 10)
static void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) { static void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) {
if (pInfo->numSBFs < count ) { if (pInfo->numSBFs < count) {
count = pInfo->numSBFs; count = pInfo->numSBFs;
} }
for (uint64_t i = 0; i < count; ++i) { for (uint64_t i = 0; i < count; ++i) {
SScalableBf *tsSBF = tScalableBfInit(pInfo->interval * ROWS_PER_MILLISECOND, SScalableBf *tsSBF = tScalableBfInit(pInfo->interval * ROWS_PER_MILLISECOND, DEFAULT_FALSE_POSITIVE);
DEFAULT_FALSE_POSITIVE);
taosArrayPush(pInfo->pTsSBFs, &tsSBF); taosArrayPush(pInfo->pTsSBFs, &tsSBF);
} }
} }
...@@ -76,7 +75,7 @@ static int64_t adjustWatermark(int64_t interval, int32_t watermark) { ...@@ -76,7 +75,7 @@ static int64_t adjustWatermark(int64_t interval, int32_t watermark) {
return watermark; return watermark;
} }
SUpdateInfo *updateInfoInitP(SInterval* pInterval, int64_t watermark) { SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark) {
return updateInfoInit(pInterval->interval, pInterval->precision, watermark); return updateInfoInit(pInterval->interval, pInterval->precision, watermark);
} }
...@@ -93,7 +92,7 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma ...@@ -93,7 +92,7 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma
uint64_t bfSize = (uint64_t)(pInfo->watermark / pInfo->interval); uint64_t bfSize = (uint64_t)(pInfo->watermark / pInfo->interval);
pInfo->pTsSBFs = taosArrayInit(bfSize, sizeof(SScalableBf)); pInfo->pTsSBFs = taosArrayInit(bfSize, sizeof(void *));
if (pInfo->pTsSBFs == NULL) { if (pInfo->pTsSBFs == NULL) {
updateInfoDestroy(pInfo); updateInfoDestroy(pInfo);
return NULL; return NULL;
...@@ -108,14 +107,14 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma ...@@ -108,14 +107,14 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma
} }
TSKEY dumy = 0; TSKEY dumy = 0;
for(uint64_t i=0; i < DEFAULT_BUCKET_SIZE; ++i) { for (uint64_t i = 0; i < DEFAULT_BUCKET_SIZE; ++i) {
taosArrayPush(pInfo->pTsBuckets, &dumy); taosArrayPush(pInfo->pTsBuckets, &dumy);
} }
pInfo->numBuckets = DEFAULT_BUCKET_SIZE; pInfo->numBuckets = DEFAULT_BUCKET_SIZE;
return pInfo; return pInfo;
} }
static SScalableBf* getSBf(SUpdateInfo *pInfo, TSKEY ts) { static SScalableBf *getSBf(SUpdateInfo *pInfo, TSKEY ts) {
if (ts <= 0) { if (ts <= 0) {
return NULL; return NULL;
} }
...@@ -131,24 +130,23 @@ static SScalableBf* getSBf(SUpdateInfo *pInfo, TSKEY ts) { ...@@ -131,24 +130,23 @@ static SScalableBf* getSBf(SUpdateInfo *pInfo, TSKEY ts) {
} }
SScalableBf *res = taosArrayGetP(pInfo->pTsSBFs, index); SScalableBf *res = taosArrayGetP(pInfo->pTsSBFs, index);
if (res == NULL) { if (res == NULL) {
res = tScalableBfInit(pInfo->interval * ROWS_PER_MILLISECOND, res = tScalableBfInit(pInfo->interval * ROWS_PER_MILLISECOND, DEFAULT_FALSE_POSITIVE);
DEFAULT_FALSE_POSITIVE);
taosArrayPush(pInfo->pTsSBFs, &res); taosArrayPush(pInfo->pTsSBFs, &res);
} }
return res; return res;
} }
bool updateInfoIsUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts) { bool updateInfoIsUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts) {
int32_t res = TSDB_CODE_FAILED; int32_t res = TSDB_CODE_FAILED;
uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets; uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets;
SScalableBf* pSBf = getSBf(pInfo, ts); SScalableBf *pSBf = getSBf(pInfo, ts);
// pSBf may be a null pointer // pSBf may be a null pointer
if (pSBf) { if (pSBf) {
res = tScalableBfPut(pSBf, &ts, sizeof(TSKEY)); res = tScalableBfPut(pSBf, &ts, sizeof(TSKEY));
} }
TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index); TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index);
if (maxTs < ts ) { if (maxTs < ts) {
taosArraySet(pInfo->pTsBuckets, index, &ts); taosArraySet(pInfo->pTsBuckets, index, &ts);
return false; return false;
} }
...@@ -159,7 +157,7 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts) { ...@@ -159,7 +157,7 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts) {
return false; return false;
} }
//check from tsdb api // check from tsdb api
return true; return true;
} }
...@@ -174,7 +172,7 @@ void updateInfoDestroy(SUpdateInfo *pInfo) { ...@@ -174,7 +172,7 @@ void updateInfoDestroy(SUpdateInfo *pInfo) {
SScalableBf *pSBF = taosArrayGetP(pInfo->pTsSBFs, i); SScalableBf *pSBF = taosArrayGetP(pInfo->pTsSBFs, i);
tScalableBfDestroy(pSBF); tScalableBfDestroy(pSBF);
} }
taosArrayDestroy(pInfo->pTsSBFs); taosArrayDestroy(pInfo->pTsSBFs);
taosMemoryFree(pInfo); taosMemoryFree(pInfo);
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册