提交 41e2538a 编写于 作者: 5 54liuyao

scalable bloom filter encode & decode

上级 500a0ca5
......@@ -47,6 +47,8 @@ bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, u
void updateInfoDestroy(SUpdateInfo *pInfo);
void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo);
void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo);
int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo);
int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo);
#ifdef __cplusplus
}
......
......@@ -17,6 +17,7 @@
#define _TD_UTIL_BLOOMFILTER_H_
#include "os.h"
#include "tencode.h"
#include "thash.h"
#ifdef __cplusplus
......@@ -42,6 +43,8 @@ int32_t tBloomFilterNoContain(const SBloomFilter *pBF, const void *keyBuf,
void tBloomFilterDestroy(SBloomFilter *pBF);
void tBloomFilterDump(const SBloomFilter *pBF);
bool tBloomFilterIsFull(const SBloomFilter *pBF);
int32_t tBloomFilterEncode(const SBloomFilter *pBF, SEncoder* pEncoder);
SBloomFilter* tBloomFilterDecode(SDecoder* pDecoder);
#ifdef __cplusplus
}
......
......@@ -33,7 +33,8 @@ int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len);
int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf,
uint32_t len);
void tScalableBfDestroy(SScalableBf *pSBf);
void tScalableBfDump(const SScalableBf *pSBf);
int32_t tScalableBfEncode(const SScalableBf *pSBf, SEncoder* pEncoder);
SScalableBf* tScalableBfDecode(SDecoder* pDecoder);
#ifdef __cplusplus
}
......
......@@ -14,6 +14,7 @@
*/
#include "tstreamUpdate.h"
#include "tencode.h"
#include "ttime.h"
#include "query.h"
......@@ -250,3 +251,110 @@ void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo) {
tScalableBfDestroy(pInfo->pCloseWinSBF);
pInfo->pCloseWinSBF = NULL;
}
int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo) {
ASSERT(pInfo);
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
int32_t size = taosArrayGetSize(pInfo->pTsBuckets);
if (tEncodeI32(&encoder, size) < 0) return -1;
for (int32_t i = 0; i < size; i++) {
TSKEY* pTs = (TSKEY*)taosArrayGet(pInfo->pTsBuckets, i);
if (tEncodeI64(&encoder, *pTs) < 0) return -1;
}
if (tEncodeU64(&encoder, pInfo->numBuckets) < 0) return -1;
int32_t sBfSize = taosArrayGetSize(pInfo->pTsSBFs);
if (tEncodeI32(&encoder, sBfSize) < 0) return -1;
for (int32_t i = 0; i < sBfSize; i++) {
SScalableBf* pSBf = taosArrayGetP(pInfo->pTsSBFs, i);
if (tScalableBfEncode(pSBf, &encoder) < 0) return -1;
}
if (tEncodeU64(&encoder, pInfo->numSBFs) < 0) return -1;
if (tEncodeI64(&encoder, pInfo->interval) < 0) return -1;
if (tEncodeI64(&encoder, pInfo->watermark) < 0) return -1;
if (tEncodeI64(&encoder, pInfo->minTS) < 0) return -1;
if (tScalableBfEncode(pInfo->pCloseWinSBF, &encoder) < 0) return -1;
int32_t mapSize = taosHashGetSize(pInfo->pMap);
if (tEncodeI32(&encoder, mapSize) < 0) return -1;
void* pIte = NULL;
size_t keyLen = 0;
while ((pIte = taosHashIterate(pInfo->pMap, pIte)) != NULL) {
void* key = taosHashGetKey(pIte, &keyLen);
if (tEncodeU64(&encoder, *(uint64_t*)key) < 0) return -1;
if (tEncodeI64(&encoder, *(TSKEY*)pIte) < 0) return -1;
}
if (tEncodeI64(&encoder, pInfo->scanWindow.skey) < 0) return -1;
if (tEncodeI64(&encoder, pInfo->scanWindow.ekey) < 0) return -1;
if (tEncodeU64(&encoder, pInfo->scanGroupId) < 0) return -1;
if (tEncodeU64(&encoder, pInfo->maxVersion) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) {
ASSERT(pInfo);
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
int32_t size = 0;
if (tDecodeI32(&decoder, &size) < 0) return -1;
pInfo->pTsBuckets = taosArrayInit(size, sizeof(TSKEY));
TSKEY ts = INT64_MIN;
for (int32_t i = 0; i < size; i++) {
if (tDecodeI64(&decoder, &ts) < 0) return -1;
taosArrayPush(pInfo->pTsBuckets, &ts);
}
if (tDecodeU64(&decoder, &pInfo->numBuckets) < 0) return -1;
int32_t sBfSize = 0;
if (tDecodeI32(&decoder, &sBfSize) < 0) return -1;
pInfo->pTsSBFs = taosArrayInit(sBfSize, sizeof(void *));
for (int32_t i = 0; i < sBfSize; i++) {
SScalableBf* pSBf = tScalableBfDecode(&decoder);
if (!pSBf) return -1;
taosArrayPush(pInfo->pTsSBFs, &pSBf);
}
if (tDecodeU64(&decoder, &pInfo->numSBFs) < 0) return -1;
if (tDecodeI64(&decoder, &pInfo->interval) < 0) return -1;
if (tDecodeI64(&decoder, &pInfo->watermark) < 0) return -1;
if (tDecodeI64(&decoder, &pInfo->minTS) < 0) return -1;
pInfo->pCloseWinSBF = tScalableBfDecode(&decoder);
int32_t mapSize = 0;
if (tDecodeI32(&decoder, &mapSize) < 0) return -1;
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pMap = taosHashInit(mapSize, hashFn, true, HASH_NO_LOCK);
uint64_t uid = 0;
ts = INT64_MIN;
for(int32_t i = 0; i < mapSize; i++) {
if (tDecodeU64(&decoder, &uid) < 0) return -1;
if (tDecodeI64(&decoder, &ts) < 0) return -1;
taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), &ts, sizeof(TSKEY));
}
ASSERT(mapSize == taosHashGetSize(pInfo->pMap));
if (tDecodeI64(&decoder, &pInfo->scanWindow.skey) < 0) return -1;
if (tDecodeI64(&decoder, &pInfo->scanWindow.ekey) < 0) return -1;
if (tDecodeU64(&decoder, &pInfo->scanGroupId) < 0) return -1;
if (tDecodeU64(&decoder, &pInfo->maxVersion) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
......@@ -6,11 +6,37 @@
using namespace std;
#define MAX_NUM_SCALABLE_BF 100000
bool equalSBF(SScalableBf* left, SScalableBf* right) {
if (left->growth != right->growth) return false;
if (left->numBits != right->numBits) return false;
int lsize = taosArrayGetSize(left->bfArray);
int rsize = taosArrayGetSize(right->bfArray);
if (lsize != rsize) return false;
for (int32_t i = 0; i < lsize; i++) {
SBloomFilter* pLeftBF = (SBloomFilter*)taosArrayGetP(left->bfArray, i);
SBloomFilter* pRightBF = (SBloomFilter*)taosArrayGetP(right->bfArray, i);
if (pLeftBF->errorRate != pRightBF->errorRate) return false;
if (pLeftBF->expectedEntries != pRightBF->expectedEntries) return false;
if (pLeftBF->hashFn1 != pRightBF->hashFn1) return false;
if (pLeftBF->hashFn2 != pRightBF->hashFn2) return false;
if (pLeftBF->hashFunctions != pRightBF->hashFunctions) return false;
if (pLeftBF->numBits != pRightBF->numBits) return false;
if (pLeftBF->numUnits != pRightBF->numUnits) return false;
if (pLeftBF->size != pRightBF->size) return false;
uint64_t* leftUint = (uint64_t*) pLeftBF->buffer;
uint64_t* rightUint = (uint64_t*) pRightBF->buffer;
for (int32_t j = 0; j < pLeftBF->numUnits; j++) {
if (leftUint[j] != rightUint[j]) return false;
}
}
return true;
}
TEST(TD_STREAM_UPDATE_TEST, update) {
int64_t interval = 20 * 1000;
int64_t watermark = 10 * 60 * 1000;
const int64_t interval = 20 * 1000;
const int64_t watermark = 10 * 60 * 1000;
SUpdateInfo *pSU = updateInfoInit(interval, TSDB_TIME_PRECISION_MILLI, watermark);
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU,1, 0), true);
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU,1, 0), false);
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU,1, -1), true);
for(int i=0; i < 1024; i++) {
......@@ -31,15 +57,16 @@ TEST(TD_STREAM_UPDATE_TEST, update) {
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU,i, 1), true);
}
TSKEY uid = 0;
for(int i=3; i < 1024; i++) {
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU,0, i), false);
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU, uid, i), false);
}
GTEST_ASSERT_EQ(*(int64_t*)taosArrayGet(pSU->pTsBuckets,0), 1023);
GTEST_ASSERT_EQ(*(TSKEY*)taosHashGet(pSU->pMap, &uid, sizeof(uint64_t)), 1023);
for(int i=3; i < 1024; i++) {
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU,0, i), true);
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU, uid, i), true);
}
GTEST_ASSERT_EQ(*(int64_t*)taosArrayGet(pSU->pTsBuckets,0), 1023);
GTEST_ASSERT_EQ(*(TSKEY*)taosHashGet(pSU->pMap, &uid, sizeof(uint64_t)), 1023);
SUpdateInfo *pSU1 = updateInfoInit(interval, TSDB_TIME_PRECISION_MILLI, watermark);
for(int i=1; i <= watermark / interval; i++) {
......@@ -75,7 +102,8 @@ TEST(TD_STREAM_UPDATE_TEST, update) {
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU2, 1, i * interval + 5), false);
GTEST_ASSERT_EQ(pSU2->minTS, (i-(pSU2->numSBFs-1))*interval);
GTEST_ASSERT_EQ(pSU2->numSBFs, watermark / interval);
GTEST_ASSERT_EQ(*(int64_t*)taosArrayGet(pSU2->pTsBuckets,1), i * interval + 5);
TSKEY uid2 = 1;
GTEST_ASSERT_EQ(*(TSKEY*)taosHashGet(pSU2->pMap, &uid2, sizeof(uint64_t)), i * interval + 5);
}
SUpdateInfo *pSU3 = updateInfoInit(interval, TSDB_TIME_PRECISION_MILLI, watermark);
......@@ -84,7 +112,8 @@ TEST(TD_STREAM_UPDATE_TEST, update) {
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU3, i, i * interval + 5 * j), false);
GTEST_ASSERT_EQ(pSU3->minTS, 0);
GTEST_ASSERT_EQ(pSU3->numSBFs, watermark / interval);
GTEST_ASSERT_EQ(*(int64_t*)taosArrayGet(pSU3->pTsBuckets, i), i * interval + 5 * j);
uint64_t uid3 = i;
GTEST_ASSERT_EQ(*(TSKEY*)taosHashGet(pSU3->pMap, &uid3, sizeof(uint64_t)), i * interval + 5 * j);
SScalableBf *pSBF = (SScalableBf *)taosArrayGetP(pSU3->pTsSBFs, i);
SBloomFilter *pBF = (SBloomFilter *)taosArrayGetP(pSBF->bfArray, 0);
GTEST_ASSERT_EQ(pBF->size, j);
......@@ -92,13 +121,66 @@ TEST(TD_STREAM_UPDATE_TEST, update) {
}
SUpdateInfo *pSU4 = updateInfoInit(-1, TSDB_TIME_PRECISION_MILLI, -1);
GTEST_ASSERT_EQ(pSU4->watermark, MAX_NUM_SCALABLE_BF * pSU4->interval);
GTEST_ASSERT_EQ(pSU4->watermark, pSU4->interval);
GTEST_ASSERT_EQ(pSU4->interval, MILLISECOND_PER_MINUTE);
SUpdateInfo *pSU5 = updateInfoInit(0, TSDB_TIME_PRECISION_MILLI, 0);
GTEST_ASSERT_EQ(pSU5->watermark, MAX_NUM_SCALABLE_BF * pSU4->interval);
GTEST_ASSERT_EQ(pSU5->watermark, pSU4->interval);
GTEST_ASSERT_EQ(pSU5->interval, MILLISECOND_PER_MINUTE);
SUpdateInfo *pSU7 = updateInfoInit(interval, TSDB_TIME_PRECISION_MILLI, watermark);
updateInfoAddCloseWindowSBF(pSU7);
for(int64_t i = 1; i < 2048000; i++) {
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU7,i, i), false);
}
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU7,100, 1), true);
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU7,110, 10), true);
GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU7,200, 20), true);
int32_t bufLen = updateInfoSerialize(NULL, 0, pSU7);
void* buf = taosMemoryCalloc(1, bufLen);
int32_t resSize = updateInfoSerialize(buf, bufLen, pSU7);
SUpdateInfo *pSU6 = updateInfoInit(0, TSDB_TIME_PRECISION_MILLI, 0);
int32_t desSize = updateInfoDeserialize(buf, bufLen, pSU6);
GTEST_ASSERT_EQ(desSize, 0);
GTEST_ASSERT_EQ(pSU7->interval, pSU6->interval);
GTEST_ASSERT_EQ(pSU7->maxVersion, pSU6->maxVersion);
GTEST_ASSERT_EQ(pSU7->minTS, pSU6->minTS);
GTEST_ASSERT_EQ(pSU7->numBuckets, pSU6->numBuckets);
GTEST_ASSERT_EQ(pSU7->numSBFs, pSU6->numSBFs);
GTEST_ASSERT_EQ(pSU7->scanGroupId, pSU6->scanGroupId);
GTEST_ASSERT_EQ(pSU7->scanWindow.ekey, pSU6->scanWindow.ekey);
GTEST_ASSERT_EQ(pSU7->scanWindow.skey, pSU6->scanWindow.skey);
GTEST_ASSERT_EQ(pSU7->watermark, pSU6->watermark);
GTEST_ASSERT_EQ(equalSBF(pSU7->pCloseWinSBF, pSU6->pCloseWinSBF), true);
int32_t mapSize = taosHashGetSize(pSU7->pMap);
GTEST_ASSERT_EQ(mapSize, taosHashGetSize(pSU6->pMap));
void* pIte = NULL;
size_t keyLen = 0;
while ((pIte = taosHashIterate(pSU7->pMap, pIte)) != NULL) {
void* key = taosHashGetKey(pIte, &keyLen);
void* value6 = taosHashGet(pSU6->pMap, key, keyLen);
GTEST_ASSERT_EQ(*(TSKEY*)pIte, *(TSKEY*)value6);
}
int32_t buSize = taosArrayGetSize(pSU7->pTsBuckets);
GTEST_ASSERT_EQ(buSize, taosArrayGetSize(pSU6->pTsBuckets));
for (int32_t i = 0; i < buSize; i++) {
TSKEY ts1 = *(TSKEY*)taosArrayGet(pSU7->pTsBuckets, i);
TSKEY ts2 = *(TSKEY*)taosArrayGet(pSU6->pTsBuckets, i);
GTEST_ASSERT_EQ(ts1, ts2);
}
int32_t lSize = taosArrayGetSize(pSU7->pTsSBFs);
int32_t rSize = taosArrayGetSize(pSU6->pTsSBFs);
GTEST_ASSERT_EQ(lSize, rSize);
for (int32_t i = 0; i < lSize; i++) {
SScalableBf* pLeftSBF = (SScalableBf*)taosArrayGetP(pSU7->pTsSBFs, i);
SScalableBf* pRightSBF = (SScalableBf*)taosArrayGetP(pSU6->pTsSBFs, i);
GTEST_ASSERT_EQ(equalSBF(pLeftSBF, pRightSBF), true);
}
updateInfoDestroy(pSU);
updateInfoDestroy(pSU1);
......@@ -106,6 +188,9 @@ TEST(TD_STREAM_UPDATE_TEST, update) {
updateInfoDestroy(pSU3);
updateInfoDestroy(pSU4);
updateInfoDestroy(pSU5);
updateInfoDestroy(pSU6);
updateInfoDestroy(pSU7);
}
int main(int argc, char* argv[]) {
......
......@@ -108,8 +108,41 @@ void tBloomFilterDestroy(SBloomFilter *pBF) {
taosMemoryFree(pBF);
}
void tBloomFilterDump(const struct SBloomFilter *pBF) {
// ToDo
int32_t tBloomFilterEncode(const SBloomFilter *pBF, SEncoder* pEncoder) {
if (tEncodeU32(pEncoder, pBF->hashFunctions) < 0) return -1;
if (tEncodeU64(pEncoder, pBF->expectedEntries) < 0) return -1;
if (tEncodeU64(pEncoder, pBF->numUnits) < 0) return -1;
if (tEncodeU64(pEncoder, pBF->numBits) < 0) return -1;
if (tEncodeU64(pEncoder, pBF->size) < 0) return -1;
for (uint64_t i = 0; i < pBF->numUnits; i++) {
uint64_t* pUnits = (uint64_t*)pBF->buffer;
if (tEncodeU64(pEncoder, pUnits[i]) < 0) return -1;
}
if (tEncodeDouble(pEncoder, pBF->errorRate) < 0) return -1;
return 0;
}
SBloomFilter* tBloomFilterDecode(SDecoder* pDecoder) {
SBloomFilter *pBF = taosMemoryCalloc(1, sizeof(SBloomFilter));
pBF->buffer = NULL;
if (tDecodeU32(pDecoder, &pBF->hashFunctions) < 0) goto _error;
if (tDecodeU64(pDecoder, &pBF->expectedEntries) < 0) goto _error;
if (tDecodeU64(pDecoder, &pBF->numUnits) < 0) goto _error;
if (tDecodeU64(pDecoder, &pBF->numBits) < 0) goto _error;
if (tDecodeU64(pDecoder, &pBF->size) < 0) goto _error;
pBF->buffer = taosMemoryCalloc(pBF->numUnits, sizeof(uint64_t));
for (int32_t i = 0; i < pBF->numUnits; i++) {
uint64_t* pUnits = (uint64_t*)pBF->buffer;
if (tDecodeU64(pDecoder, pUnits + i) < 0) goto _error;
}
if (tDecodeDouble(pDecoder, &pBF->errorRate) < 0) goto _error;
pBF->hashFn1 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP);
pBF->hashFn2 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_NCHAR);
return pBF;
_error:
tBloomFilterDestroy(pBF);
return NULL;
}
bool tBloomFilterIsFull(const SBloomFilter *pBF) {
......
......@@ -101,6 +101,42 @@ void tScalableBfDestroy(SScalableBf *pSBf) {
taosMemoryFree(pSBf);
}
void tScalableBfDump(const SScalableBf *pSBf) {
// Todo;
}
\ No newline at end of file
int32_t tScalableBfEncode(const SScalableBf *pSBf, SEncoder* pEncoder) {
if (!pSBf) {
if (tEncodeI32(pEncoder, 0) < 0) return -1;
return 0;
}
int32_t size = taosArrayGetSize(pSBf->bfArray);
if (tEncodeI32(pEncoder, size) < 0) return -1;
for (int32_t i = 0; i < size; i++) {
SBloomFilter* pBF = taosArrayGetP(pSBf->bfArray, i);
if (tBloomFilterEncode(pBF, pEncoder) < 0) return -1;
}
if (tEncodeU32(pEncoder, pSBf->growth) < 0) return -1;
if (tEncodeU64(pEncoder, pSBf->numBits) < 0) return -1;
return 0;
}
SScalableBf* tScalableBfDecode(SDecoder* pDecoder) {
SScalableBf *pSBf = taosMemoryCalloc(1, sizeof(SScalableBf));
pSBf->bfArray = NULL;
int32_t size = 0;
if (tDecodeI32(pDecoder, &size) < 0) goto _error;
if (size == 0) {
tScalableBfDestroy(pSBf);
return NULL;
}
pSBf->bfArray = taosArrayInit(size * 2, sizeof(void *));
for (int32_t i = 0; i < size; i++) {
SBloomFilter* pBF = tBloomFilterDecode(pDecoder);
if (!pBF) goto _error;
taosArrayPush(pSBf->bfArray, &pBF);
}
if (tDecodeU32(pDecoder, &pSBf->growth) < 0) goto _error;
if (tDecodeU64(pDecoder, &pSBf->numBits) < 0) goto _error;
return pSBf;
_error:
tScalableBfDestroy(pSBf);
return NULL;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册