diff --git a/include/libs/stream/tstreamUpdate.h b/include/libs/stream/tstreamUpdate.h index a4728e6382c5f6838a07c89ff349e716964a4d5a..78543118da026cf8558e9f9208689b3bf5cbac75 100644 --- a/include/libs/stream/tstreamUpdate.h +++ b/include/libs/stream/tstreamUpdate.h @@ -47,6 +47,8 @@ bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, u void updateInfoDestroy(SUpdateInfo *pInfo); void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo); void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo); +int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo); +int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo); #ifdef __cplusplus } diff --git a/include/util/tbloomfilter.h b/include/util/tbloomfilter.h index b168da594a172dfb84b089d92533993aaef001f8..c9ca905f8244205f40ae932b9574435808b3bb96 100644 --- a/include/util/tbloomfilter.h +++ b/include/util/tbloomfilter.h @@ -17,6 +17,7 @@ #define _TD_UTIL_BLOOMFILTER_H_ #include "os.h" +#include "tencode.h" #include "thash.h" #ifdef __cplusplus @@ -42,6 +43,8 @@ int32_t tBloomFilterNoContain(const SBloomFilter *pBF, const void *keyBuf, void tBloomFilterDestroy(SBloomFilter *pBF); void tBloomFilterDump(const SBloomFilter *pBF); bool tBloomFilterIsFull(const SBloomFilter *pBF); +int32_t tBloomFilterEncode(const SBloomFilter *pBF, SEncoder* pEncoder); +SBloomFilter* tBloomFilterDecode(SDecoder* pDecoder); #ifdef __cplusplus } diff --git a/include/util/tscalablebf.h b/include/util/tscalablebf.h index 8f88f65048ee02052329cf8e0dd655f3b8f6d48a..1386f840a84006db687ff47bb324e623be887731 100644 --- a/include/util/tscalablebf.h +++ b/include/util/tscalablebf.h @@ -33,7 +33,8 @@ int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len); int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, uint32_t len); void tScalableBfDestroy(SScalableBf *pSBf); -void tScalableBfDump(const SScalableBf *pSBf); +int32_t tScalableBfEncode(const SScalableBf *pSBf, SEncoder* pEncoder); +SScalableBf* tScalableBfDecode(SDecoder* pDecoder); #ifdef __cplusplus } diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index ff1ef7b4b95ef9d7ae4e4bac1fdeb17b2fb31b2c..c686fa05cecf7fa611df67addd6ab9867feb3529 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -14,6 +14,7 @@ */ #include "tstreamUpdate.h" +#include "tencode.h" #include "ttime.h" #include "query.h" @@ -250,3 +251,110 @@ void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo) { tScalableBfDestroy(pInfo->pCloseWinSBF); pInfo->pCloseWinSBF = NULL; } + +int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo) { + ASSERT(pInfo); + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + if (tStartEncode(&encoder) < 0) return -1; + + int32_t size = taosArrayGetSize(pInfo->pTsBuckets); + if (tEncodeI32(&encoder, size) < 0) return -1; + for (int32_t i = 0; i < size; i++) { + TSKEY* pTs = (TSKEY*)taosArrayGet(pInfo->pTsBuckets, i); + if (tEncodeI64(&encoder, *pTs) < 0) return -1; + } + + if (tEncodeU64(&encoder, pInfo->numBuckets) < 0) return -1; + + int32_t sBfSize = taosArrayGetSize(pInfo->pTsSBFs); + if (tEncodeI32(&encoder, sBfSize) < 0) return -1; + for (int32_t i = 0; i < sBfSize; i++) { + SScalableBf* pSBf = taosArrayGetP(pInfo->pTsSBFs, i); + if (tScalableBfEncode(pSBf, &encoder) < 0) return -1; + } + + if (tEncodeU64(&encoder, pInfo->numSBFs) < 0) return -1; + if (tEncodeI64(&encoder, pInfo->interval) < 0) return -1; + if (tEncodeI64(&encoder, pInfo->watermark) < 0) return -1; + if (tEncodeI64(&encoder, pInfo->minTS) < 0) return -1; + + if (tScalableBfEncode(pInfo->pCloseWinSBF, &encoder) < 0) return -1; + + int32_t mapSize = taosHashGetSize(pInfo->pMap); + if (tEncodeI32(&encoder, mapSize) < 0) return -1; + void* pIte = NULL; + size_t keyLen = 0; + while ((pIte = taosHashIterate(pInfo->pMap, pIte)) != NULL) { + void* key = taosHashGetKey(pIte, &keyLen); + if (tEncodeU64(&encoder, *(uint64_t*)key) < 0) return -1; + if (tEncodeI64(&encoder, *(TSKEY*)pIte) < 0) return -1; + } + + if (tEncodeI64(&encoder, pInfo->scanWindow.skey) < 0) return -1; + if (tEncodeI64(&encoder, pInfo->scanWindow.ekey) < 0) return -1; + if (tEncodeU64(&encoder, pInfo->scanGroupId) < 0) return -1; + if (tEncodeU64(&encoder, pInfo->maxVersion) < 0) return -1; + + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) { + ASSERT(pInfo); + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + if (tStartDecode(&decoder) < 0) return -1; + + int32_t size = 0; + if (tDecodeI32(&decoder, &size) < 0) return -1; + pInfo->pTsBuckets = taosArrayInit(size, sizeof(TSKEY)); + TSKEY ts = INT64_MIN; + for (int32_t i = 0; i < size; i++) { + if (tDecodeI64(&decoder, &ts) < 0) return -1; + taosArrayPush(pInfo->pTsBuckets, &ts); + } + + if (tDecodeU64(&decoder, &pInfo->numBuckets) < 0) return -1; + + int32_t sBfSize = 0; + if (tDecodeI32(&decoder, &sBfSize) < 0) return -1; + pInfo->pTsSBFs = taosArrayInit(sBfSize, sizeof(void *)); + for (int32_t i = 0; i < sBfSize; i++) { + SScalableBf* pSBf = tScalableBfDecode(&decoder); + if (!pSBf) return -1; + taosArrayPush(pInfo->pTsSBFs, &pSBf); + } + + if (tDecodeU64(&decoder, &pInfo->numSBFs) < 0) return -1; + if (tDecodeI64(&decoder, &pInfo->interval) < 0) return -1; + if (tDecodeI64(&decoder, &pInfo->watermark) < 0) return -1; + if (tDecodeI64(&decoder, &pInfo->minTS) < 0) return -1; + pInfo->pCloseWinSBF = tScalableBfDecode(&decoder); + + int32_t mapSize = 0; + if (tDecodeI32(&decoder, &mapSize) < 0) return -1; + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + pInfo->pMap = taosHashInit(mapSize, hashFn, true, HASH_NO_LOCK); + uint64_t uid = 0; + ts = INT64_MIN; + for(int32_t i = 0; i < mapSize; i++) { + if (tDecodeU64(&decoder, &uid) < 0) return -1; + if (tDecodeI64(&decoder, &ts) < 0) return -1; + taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), &ts, sizeof(TSKEY)); + } + ASSERT(mapSize == taosHashGetSize(pInfo->pMap)); + + if (tDecodeI64(&decoder, &pInfo->scanWindow.skey) < 0) return -1; + if (tDecodeI64(&decoder, &pInfo->scanWindow.ekey) < 0) return -1; + if (tDecodeU64(&decoder, &pInfo->scanGroupId) < 0) return -1; + if (tDecodeU64(&decoder, &pInfo->maxVersion) < 0) return -1; + + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} diff --git a/source/libs/stream/test/tstreamUpdateTest.cpp b/source/libs/stream/test/tstreamUpdateTest.cpp index 93e114db020591dc703b2876cbecc25e1c363011..933549b8a6b423bc5a3a497ecdbee13ffc7fb448 100644 --- a/source/libs/stream/test/tstreamUpdateTest.cpp +++ b/source/libs/stream/test/tstreamUpdateTest.cpp @@ -6,11 +6,37 @@ using namespace std; #define MAX_NUM_SCALABLE_BF 100000 +bool equalSBF(SScalableBf* left, SScalableBf* right) { + if (left->growth != right->growth) return false; + if (left->numBits != right->numBits) return false; + int lsize = taosArrayGetSize(left->bfArray); + int rsize = taosArrayGetSize(right->bfArray); + if (lsize != rsize) return false; + for (int32_t i = 0; i < lsize; i++) { + SBloomFilter* pLeftBF = (SBloomFilter*)taosArrayGetP(left->bfArray, i); + SBloomFilter* pRightBF = (SBloomFilter*)taosArrayGetP(right->bfArray, i); + if (pLeftBF->errorRate != pRightBF->errorRate) return false; + if (pLeftBF->expectedEntries != pRightBF->expectedEntries) return false; + if (pLeftBF->hashFn1 != pRightBF->hashFn1) return false; + if (pLeftBF->hashFn2 != pRightBF->hashFn2) return false; + if (pLeftBF->hashFunctions != pRightBF->hashFunctions) return false; + if (pLeftBF->numBits != pRightBF->numBits) return false; + if (pLeftBF->numUnits != pRightBF->numUnits) return false; + if (pLeftBF->size != pRightBF->size) return false; + uint64_t* leftUint = (uint64_t*) pLeftBF->buffer; + uint64_t* rightUint = (uint64_t*) pRightBF->buffer; + for (int32_t j = 0; j < pLeftBF->numUnits; j++) { + if (leftUint[j] != rightUint[j]) return false; + } + } + return true; +} + TEST(TD_STREAM_UPDATE_TEST, update) { - int64_t interval = 20 * 1000; - int64_t watermark = 10 * 60 * 1000; + const int64_t interval = 20 * 1000; + const int64_t watermark = 10 * 60 * 1000; SUpdateInfo *pSU = updateInfoInit(interval, TSDB_TIME_PRECISION_MILLI, watermark); - GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU,1, 0), true); + GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU,1, 0), false); GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU,1, -1), true); for(int i=0; i < 1024; i++) { @@ -31,15 +57,16 @@ TEST(TD_STREAM_UPDATE_TEST, update) { GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU,i, 1), true); } + TSKEY uid = 0; for(int i=3; i < 1024; i++) { - GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU,0, i), false); + GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU, uid, i), false); } - GTEST_ASSERT_EQ(*(int64_t*)taosArrayGet(pSU->pTsBuckets,0), 1023); + GTEST_ASSERT_EQ(*(TSKEY*)taosHashGet(pSU->pMap, &uid, sizeof(uint64_t)), 1023); for(int i=3; i < 1024; i++) { - GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU,0, i), true); + GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU, uid, i), true); } - GTEST_ASSERT_EQ(*(int64_t*)taosArrayGet(pSU->pTsBuckets,0), 1023); + GTEST_ASSERT_EQ(*(TSKEY*)taosHashGet(pSU->pMap, &uid, sizeof(uint64_t)), 1023); SUpdateInfo *pSU1 = updateInfoInit(interval, TSDB_TIME_PRECISION_MILLI, watermark); for(int i=1; i <= watermark / interval; i++) { @@ -75,7 +102,8 @@ TEST(TD_STREAM_UPDATE_TEST, update) { GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU2, 1, i * interval + 5), false); GTEST_ASSERT_EQ(pSU2->minTS, (i-(pSU2->numSBFs-1))*interval); GTEST_ASSERT_EQ(pSU2->numSBFs, watermark / interval); - GTEST_ASSERT_EQ(*(int64_t*)taosArrayGet(pSU2->pTsBuckets,1), i * interval + 5); + TSKEY uid2 = 1; + GTEST_ASSERT_EQ(*(TSKEY*)taosHashGet(pSU2->pMap, &uid2, sizeof(uint64_t)), i * interval + 5); } SUpdateInfo *pSU3 = updateInfoInit(interval, TSDB_TIME_PRECISION_MILLI, watermark); @@ -84,7 +112,8 @@ TEST(TD_STREAM_UPDATE_TEST, update) { GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU3, i, i * interval + 5 * j), false); GTEST_ASSERT_EQ(pSU3->minTS, 0); GTEST_ASSERT_EQ(pSU3->numSBFs, watermark / interval); - GTEST_ASSERT_EQ(*(int64_t*)taosArrayGet(pSU3->pTsBuckets, i), i * interval + 5 * j); + uint64_t uid3 = i; + GTEST_ASSERT_EQ(*(TSKEY*)taosHashGet(pSU3->pMap, &uid3, sizeof(uint64_t)), i * interval + 5 * j); SScalableBf *pSBF = (SScalableBf *)taosArrayGetP(pSU3->pTsSBFs, i); SBloomFilter *pBF = (SBloomFilter *)taosArrayGetP(pSBF->bfArray, 0); GTEST_ASSERT_EQ(pBF->size, j); @@ -92,13 +121,66 @@ TEST(TD_STREAM_UPDATE_TEST, update) { } SUpdateInfo *pSU4 = updateInfoInit(-1, TSDB_TIME_PRECISION_MILLI, -1); - GTEST_ASSERT_EQ(pSU4->watermark, MAX_NUM_SCALABLE_BF * pSU4->interval); + GTEST_ASSERT_EQ(pSU4->watermark, pSU4->interval); GTEST_ASSERT_EQ(pSU4->interval, MILLISECOND_PER_MINUTE); SUpdateInfo *pSU5 = updateInfoInit(0, TSDB_TIME_PRECISION_MILLI, 0); - GTEST_ASSERT_EQ(pSU5->watermark, MAX_NUM_SCALABLE_BF * pSU4->interval); + GTEST_ASSERT_EQ(pSU5->watermark, pSU4->interval); GTEST_ASSERT_EQ(pSU5->interval, MILLISECOND_PER_MINUTE); + SUpdateInfo *pSU7 = updateInfoInit(interval, TSDB_TIME_PRECISION_MILLI, watermark); + updateInfoAddCloseWindowSBF(pSU7); + for(int64_t i = 1; i < 2048000; i++) { + GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU7,i, i), false); + } + GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU7,100, 1), true); + GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU7,110, 10), true); + GTEST_ASSERT_EQ(updateInfoIsUpdated(pSU7,200, 20), true); + + int32_t bufLen = updateInfoSerialize(NULL, 0, pSU7); + void* buf = taosMemoryCalloc(1, bufLen); + int32_t resSize = updateInfoSerialize(buf, bufLen, pSU7); + + SUpdateInfo *pSU6 = updateInfoInit(0, TSDB_TIME_PRECISION_MILLI, 0); + int32_t desSize = updateInfoDeserialize(buf, bufLen, pSU6); + GTEST_ASSERT_EQ(desSize, 0); + + GTEST_ASSERT_EQ(pSU7->interval, pSU6->interval); + GTEST_ASSERT_EQ(pSU7->maxVersion, pSU6->maxVersion); + GTEST_ASSERT_EQ(pSU7->minTS, pSU6->minTS); + GTEST_ASSERT_EQ(pSU7->numBuckets, pSU6->numBuckets); + GTEST_ASSERT_EQ(pSU7->numSBFs, pSU6->numSBFs); + GTEST_ASSERT_EQ(pSU7->scanGroupId, pSU6->scanGroupId); + GTEST_ASSERT_EQ(pSU7->scanWindow.ekey, pSU6->scanWindow.ekey); + GTEST_ASSERT_EQ(pSU7->scanWindow.skey, pSU6->scanWindow.skey); + GTEST_ASSERT_EQ(pSU7->watermark, pSU6->watermark); + GTEST_ASSERT_EQ(equalSBF(pSU7->pCloseWinSBF, pSU6->pCloseWinSBF), true); + + int32_t mapSize = taosHashGetSize(pSU7->pMap); + GTEST_ASSERT_EQ(mapSize, taosHashGetSize(pSU6->pMap)); + void* pIte = NULL; + size_t keyLen = 0; + while ((pIte = taosHashIterate(pSU7->pMap, pIte)) != NULL) { + void* key = taosHashGetKey(pIte, &keyLen); + void* value6 = taosHashGet(pSU6->pMap, key, keyLen); + GTEST_ASSERT_EQ(*(TSKEY*)pIte, *(TSKEY*)value6); + } + + int32_t buSize = taosArrayGetSize(pSU7->pTsBuckets); + GTEST_ASSERT_EQ(buSize, taosArrayGetSize(pSU6->pTsBuckets)); + for (int32_t i = 0; i < buSize; i++) { + TSKEY ts1 = *(TSKEY*)taosArrayGet(pSU7->pTsBuckets, i); + TSKEY ts2 = *(TSKEY*)taosArrayGet(pSU6->pTsBuckets, i); + GTEST_ASSERT_EQ(ts1, ts2); + } + int32_t lSize = taosArrayGetSize(pSU7->pTsSBFs); + int32_t rSize = taosArrayGetSize(pSU6->pTsSBFs); + GTEST_ASSERT_EQ(lSize, rSize); + for (int32_t i = 0; i < lSize; i++) { + SScalableBf* pLeftSBF = (SScalableBf*)taosArrayGetP(pSU7->pTsSBFs, i); + SScalableBf* pRightSBF = (SScalableBf*)taosArrayGetP(pSU6->pTsSBFs, i); + GTEST_ASSERT_EQ(equalSBF(pLeftSBF, pRightSBF), true); + } updateInfoDestroy(pSU); updateInfoDestroy(pSU1); @@ -106,6 +188,9 @@ TEST(TD_STREAM_UPDATE_TEST, update) { updateInfoDestroy(pSU3); updateInfoDestroy(pSU4); updateInfoDestroy(pSU5); + updateInfoDestroy(pSU6); + updateInfoDestroy(pSU7); + } int main(int argc, char* argv[]) { diff --git a/source/util/src/tbloomfilter.c b/source/util/src/tbloomfilter.c index 52c541ae2e6b66f20f15642a92e7f4b8b57c5986..945cb58fcc58ebee4c64c9fcdcabe89921e53116 100644 --- a/source/util/src/tbloomfilter.c +++ b/source/util/src/tbloomfilter.c @@ -108,8 +108,41 @@ void tBloomFilterDestroy(SBloomFilter *pBF) { taosMemoryFree(pBF); } -void tBloomFilterDump(const struct SBloomFilter *pBF) { -// ToDo +int32_t tBloomFilterEncode(const SBloomFilter *pBF, SEncoder* pEncoder) { + if (tEncodeU32(pEncoder, pBF->hashFunctions) < 0) return -1; + if (tEncodeU64(pEncoder, pBF->expectedEntries) < 0) return -1; + if (tEncodeU64(pEncoder, pBF->numUnits) < 0) return -1; + if (tEncodeU64(pEncoder, pBF->numBits) < 0) return -1; + if (tEncodeU64(pEncoder, pBF->size) < 0) return -1; + for (uint64_t i = 0; i < pBF->numUnits; i++) { + uint64_t* pUnits = (uint64_t*)pBF->buffer; + if (tEncodeU64(pEncoder, pUnits[i]) < 0) return -1; + } + if (tEncodeDouble(pEncoder, pBF->errorRate) < 0) return -1; + return 0; +} + +SBloomFilter* tBloomFilterDecode(SDecoder* pDecoder) { + SBloomFilter *pBF = taosMemoryCalloc(1, sizeof(SBloomFilter)); + pBF->buffer = NULL; + if (tDecodeU32(pDecoder, &pBF->hashFunctions) < 0) goto _error; + if (tDecodeU64(pDecoder, &pBF->expectedEntries) < 0) goto _error; + if (tDecodeU64(pDecoder, &pBF->numUnits) < 0) goto _error; + if (tDecodeU64(pDecoder, &pBF->numBits) < 0) goto _error; + if (tDecodeU64(pDecoder, &pBF->size) < 0) goto _error; + pBF->buffer = taosMemoryCalloc(pBF->numUnits, sizeof(uint64_t)); + for (int32_t i = 0; i < pBF->numUnits; i++) { + uint64_t* pUnits = (uint64_t*)pBF->buffer; + if (tDecodeU64(pDecoder, pUnits + i) < 0) goto _error; + } + if (tDecodeDouble(pDecoder, &pBF->errorRate) < 0) goto _error; + pBF->hashFn1 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP); + pBF->hashFn2 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_NCHAR); + return pBF; + +_error: + tBloomFilterDestroy(pBF); + return NULL; } bool tBloomFilterIsFull(const SBloomFilter *pBF) { diff --git a/source/util/src/tscalablebf.c b/source/util/src/tscalablebf.c index 9ddac44e201905237fdc5a1f06950d24c699246a..108eb348034afb8bd9eae458df5a78e674651833 100644 --- a/source/util/src/tscalablebf.c +++ b/source/util/src/tscalablebf.c @@ -101,6 +101,42 @@ void tScalableBfDestroy(SScalableBf *pSBf) { taosMemoryFree(pSBf); } -void tScalableBfDump(const SScalableBf *pSBf) { - // Todo; -} \ No newline at end of file +int32_t tScalableBfEncode(const SScalableBf *pSBf, SEncoder* pEncoder) { + if (!pSBf) { + if (tEncodeI32(pEncoder, 0) < 0) return -1; + return 0; + } + int32_t size = taosArrayGetSize(pSBf->bfArray); + if (tEncodeI32(pEncoder, size) < 0) return -1; + for (int32_t i = 0; i < size; i++) { + SBloomFilter* pBF = taosArrayGetP(pSBf->bfArray, i); + if (tBloomFilterEncode(pBF, pEncoder) < 0) return -1; + } + if (tEncodeU32(pEncoder, pSBf->growth) < 0) return -1; + if (tEncodeU64(pEncoder, pSBf->numBits) < 0) return -1; + return 0; +} + +SScalableBf* tScalableBfDecode(SDecoder* pDecoder) { + SScalableBf *pSBf = taosMemoryCalloc(1, sizeof(SScalableBf)); + pSBf->bfArray = NULL; + int32_t size = 0; + if (tDecodeI32(pDecoder, &size) < 0) goto _error; + if (size == 0) { + tScalableBfDestroy(pSBf); + return NULL; + } + pSBf->bfArray = taosArrayInit(size * 2, sizeof(void *)); + for (int32_t i = 0; i < size; i++) { + SBloomFilter* pBF = tBloomFilterDecode(pDecoder); + if (!pBF) goto _error; + taosArrayPush(pSBf->bfArray, &pBF); + } + if (tDecodeU32(pDecoder, &pSBf->growth) < 0) goto _error; + if (tDecodeU64(pDecoder, &pSBf->numBits) < 0) goto _error; + return pSBf; + +_error: + tScalableBfDestroy(pSBf); + return NULL; +}