From 035e6b13c8fd492b8d0afb8f38499cfb95f94665 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 21 Oct 2022 09:47:04 +0800 Subject: [PATCH] enh(stream): internal optimize --- include/common/tmsg.h | 5 +-- include/libs/stream/tstream.h | 4 +++ include/libs/stream/tstreamUpdate.h | 7 ++++ include/util/thash.h | 3 ++ source/dnode/mnode/impl/inc/mndDef.h | 1 + source/dnode/mnode/impl/src/mndDef.c | 6 ++++ source/dnode/mnode/impl/src/mndStream.c | 3 +- source/libs/executor/inc/executorimpl.h | 2 ++ source/libs/executor/src/executorimpl.c | 6 ++-- source/libs/executor/src/scanoperator.c | 13 ++++--- source/libs/executor/src/timewindowoperator.c | 9 ++--- source/libs/stream/src/streamMeta.c | 7 ++++ source/libs/stream/src/streamTask.c | 16 ++++++--- source/libs/stream/src/streamUpdate.c | 34 +++++++++++++++++-- source/util/src/tbloomfilter.c | 8 +++-- source/util/src/thashutil.c | 17 ++++++++++ 16 files changed, 118 insertions(+), 23 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 3409071932..4356c0fd24 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1726,13 +1726,14 @@ typedef struct { char name[TSDB_STREAM_FNAME_LEN]; char sourceDB[TSDB_DB_FNAME_LEN]; char targetStbFullName[TSDB_TABLE_FNAME_LEN]; - int8_t igExists; char* sql; char* ast; + int8_t igExists; int8_t triggerType; + int8_t igExpired; + int8_t fillHistory; // process data inserted before creating stream int64_t maxDelay; int64_t watermark; - int8_t igExpired; int32_t numOfTags; SArray* pTags; // array of SField } SCMCreateStreamReq; diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index bdc12f7e3f..247293dbb6 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -36,6 +36,7 @@ typedef struct SStreamTask SStreamTask; enum { STREAM_STATUS__NORMAL = 0, STREAM_STATUS__STOP, + STREAM_STATUS__INIT, STREAM_STATUS__FAILED, STREAM_STATUS__RECOVER, }; @@ -291,6 +292,9 @@ typedef struct SStreamTask { int64_t recoverSnapVer; int64_t startVer; + // fill history + int8_t fillHistory; + // children info SArray* childEpInfo; // SArray int32_t nextCheckId; diff --git a/include/libs/stream/tstreamUpdate.h b/include/libs/stream/tstreamUpdate.h index c186430f3f..1c490852f9 100644 --- a/include/libs/stream/tstreamUpdate.h +++ b/include/libs/stream/tstreamUpdate.h @@ -17,6 +17,7 @@ #include "taosdef.h" #include "tarray.h" +#include "tcommon.h" #include "tmsg.h" #include "tscalablebf.h" @@ -24,6 +25,11 @@ extern "C" { #endif +typedef struct SUpdateKey { + int64_t tbUid; + TSKEY ts; +} SUpdateKey; + typedef struct SUpdateInfo { SArray *pTsBuckets; uint64_t numBuckets; @@ -41,6 +47,7 @@ typedef struct SUpdateInfo { SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark); SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark); +void updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol); bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts); bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid); void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version); diff --git a/include/util/thash.h b/include/util/thash.h index 2be5f4a047..08caad495d 100644 --- a/include/util/thash.h +++ b/include/util/thash.h @@ -50,6 +50,9 @@ uint64_t MurmurHash3_64(const char *key, uint32_t len); uint32_t taosIntHash_32(const char *key, uint32_t len); uint32_t taosIntHash_64(const char *key, uint32_t len); +uint32_t taosFastHash(const char *key, uint32_t len); +uint32_t taosDJB2Hash(const char *key, uint32_t len); + _hash_fn_t taosGetDefaultHashFunction(int32_t type); _equal_fn_t taosGetDefaultEqualFunction(int32_t type); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 2ee732e797..74a92c9fcd 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -613,6 +613,7 @@ typedef struct { // config int8_t igExpired; int8_t trigger; + int8_t fillHistory; int64_t triggerParam; int64_t watermark; // source and target diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 462b068a73..143131bac8 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -18,6 +18,7 @@ #include "mndConsumer.h" int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { + if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1; if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1; @@ -31,6 +32,7 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { if (tEncodeI8(pEncoder, pObj->igExpired) < 0) return -1; if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1; + if (tEncodeI8(pEncoder, pObj->fillHistory) < 0) return -1; if (tEncodeI64(pEncoder, pObj->triggerParam) < 0) return -1; if (tEncodeI64(pEncoder, pObj->watermark) < 0) return -1; @@ -74,10 +76,12 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { if (tEncodeSSchemaWrapper(pEncoder, &pObj->outputSchema) < 0) return -1; + tEndEncode(pEncoder); return pEncoder->pos; } int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) { + if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1; @@ -91,6 +95,7 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) { if (tDecodeI8(pDecoder, &pObj->igExpired) < 0) return -1; if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1; + if (tDecodeI8(pDecoder, &pObj->fillHistory) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->triggerParam) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->watermark) < 0) return -1; @@ -134,6 +139,7 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) { if (tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema) < 0) return -1; + tEndDecode(pDecoder); return 0; } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index ce6b3b0656..6fffdf9a59 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -280,6 +280,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, pObj->trigger = pCreate->triggerType; pObj->triggerParam = pCreate->maxDelay; pObj->watermark = pCreate->watermark; + pObj->fillHistory = pCreate->fillHistory; memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN); SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB); @@ -686,7 +687,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); goto _OVER; } - mndTransSetDbName(pTrans, createStreamReq.sourceDB, streamObj.targetDb); // hack way + mndTransSetDbName(pTrans, createStreamReq.sourceDB, streamObj.targetDb); mInfo("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name); // create stb for stream diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 1aa3d55efc..0ea0eb400d 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -162,6 +162,8 @@ typedef struct { SQueryTableDataCond tableCond; int64_t recoverStartVer; int64_t recoverEndVer; + int64_t fillHistoryVer1; + int64_t fillHistoryVer2; SStreamState* pState; } SStreamTaskInfo; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index feced8dc60..914373a600 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -569,8 +569,10 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc return TSDB_CODE_SUCCESS; } - pResult->info.groupId = pSrcBlock->info.groupId; - memcpy(pResult->info.parTbName, pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN); + if (pResult != pSrcBlock) { + pResult->info.groupId = pSrcBlock->info.groupId; + memcpy(pResult->info.parTbName, pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN); + } // if the source equals to the destination, it is to create a new column as the result of scalar // function or some operators. diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 058b9b1b44..a8869448d9 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -284,7 +284,6 @@ static bool doLoadBlockSMA(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, return true; } - static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) { if (pTableScanInfo->pseudoSup.numOfExprs > 0) { SExprSupp* pSup = &pTableScanInfo->pseudoSup; @@ -737,7 +736,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc; int32_t numOfCols = 0; - pInfo->pColMatchInfo = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); + pInfo->pColMatchInfo = + extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); if (code != TSDB_CODE_SUCCESS) { @@ -1707,9 +1707,12 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { } #endif +#if 1 if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE) { STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; memcpy(&pTSInfo->cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond)); + pTSInfo->cond.startVersion = -1; + pTSInfo->cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1; pTSInfo->scanTimes = 0; pTSInfo->currentGroupId = -1; pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN; @@ -1718,12 +1721,14 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN) { SSDataBlock* pBlock = doTableScan(pInfo->pTableScanOp); if (pBlock != NULL) { + calBlockTbName(&pInfo->tbnameCalSup, pBlock); + updateInfoFillBlockData(pInfo->pUpdateInfo, pBlock, pInfo->primaryTsIndex); return pBlock; } - // TODO fill in bloom filter pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE; return NULL; } +#endif size_t total = taosArrayGetSize(pInfo->pBlockLists); // TODO: refactor @@ -2109,7 +2114,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, NULL, destroyRawScanOperatorInfo, NULL, NULL, NULL); return pOperator; - _end: +_end: taosMemoryFree(pInfo); taosMemoryFree(pOperator); pTaskInfo->code = code; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 505654d967..dc7b11758f 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2667,8 +2667,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num); + int32_t num = 0; + SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num); initResultSizeInfo(&pOperator->resultInfo, 4096); int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { @@ -3149,6 +3149,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SOperatorInfo* downstream = pOperator->pDownstream[0]; TSKEY maxTs = INT64_MIN; @@ -3191,6 +3192,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } else { deleteIntervalDiscBuf(pInfo->pState, pInfo->pPullDataMap, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark, &pInfo->interval, &pInfo->delKey); + streamStateCommit(pTaskInfo->streamInfo.pState); } return NULL; } else { @@ -4975,8 +4977,6 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, goto _error; } - - SInterval interval = {.interval = pNode->interval, .sliding = pNode->sliding, .intervalUnit = pNode->intervalUnit, @@ -5383,6 +5383,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { deleteIntervalDiscBuf(pInfo->pState, NULL, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark, &pInfo->interval, &pInfo->delKey); doSetOperatorCompleted(pOperator); + streamStateCommit(pTaskInfo->streamInfo.pState); return NULL; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 02b74c260b..d489cc044b 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -113,6 +113,13 @@ int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* ASSERT(0); goto FAIL; } + + if (pTask->fillHistory) { + // pipeline exec + // if finished, dispatch a stream-prepare-finished msg to downstream task + // set status normal + } + return 0; FAIL: diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 5304938195..f7252ed8a0 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -49,7 +49,7 @@ int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo) { } int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { - /*if (tStartEncode(pEncoder) < 0) return -1;*/ + if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->totalLevel) < 0) return -1; @@ -64,6 +64,10 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pTask->epSet) < 0) return -1; + if (tEncodeI64(pEncoder, pTask->recoverSnapVer) < 0) return -1; + if (tEncodeI64(pEncoder, pTask->startVer) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->fillHistory) < 0) return -1; + int32_t epSz = taosArrayGetSize(pTask->childEpInfo); if (tEncodeI32(pEncoder, epSz) < 0) return -1; for (int32_t i = 0; i < epSz; i++) { @@ -93,12 +97,12 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { } if (tEncodeI64(pEncoder, pTask->triggerParam) < 0) return -1; - /*tEndEncode(pEncoder);*/ + tEndEncode(pEncoder); return pEncoder->pos; } int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { - /*if (tStartDecode(pDecoder) < 0) return -1;*/ + if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->totalLevel) < 0) return -1; @@ -113,6 +117,10 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pTask->epSet) < 0) return -1; + if (tDecodeI64(pDecoder, &pTask->recoverSnapVer) < 0) return -1; + if (tDecodeI64(pDecoder, &pTask->startVer) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->fillHistory) < 0) return -1; + int32_t epSz; if (tDecodeI32(pDecoder, &epSz) < 0) return -1; pTask->childEpInfo = taosArrayInit(epSz, sizeof(void*)); @@ -150,7 +158,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { } if (tDecodeI64(pDecoder, &pTask->triggerParam) < 0) return -1; - /*tEndDecode(pDecoder);*/ + tEndDecode(pDecoder); return 0; } diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index 5aadb599a3..80410568e5 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -14,6 +14,7 @@ */ #include "query.h" +#include "tdatablock.h" #include "tencode.h" #include "tstreamUpdate.h" #include "ttime.h" @@ -162,15 +163,42 @@ bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid) { return false; } +void updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol) { + if (pBlock == NULL || pBlock->info.rows == 0) return; + TSKEY maxTs = -1; + int64_t tbUid = pBlock->info.uid; + + SColumnInfoData *pColDataInfo = taosArrayGet(pBlock->pDataBlock, primaryTsCol); + + for (int32_t i = 0; i < pBlock->info.rows; i++) { + TSKEY ts = ((TSKEY *)pColDataInfo->pData)[i]; + maxTs = TMAX(maxTs, ts); + SScalableBf *pSBf = getSBf(pInfo, ts); + if (pSBf) { + tScalableBfPut(pSBf, &ts, sizeof(TSKEY)); + } + } + TSKEY *pMaxTs = taosHashGet(pInfo->pMap, &tbUid, sizeof(int64_t)); + if (pMaxTs == NULL || *pMaxTs > tbUid) { + taosHashPut(pInfo->pMap, &tbUid, sizeof(int64_t), &maxTs, sizeof(TSKEY)); + } +} + bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) { - int32_t res = TSDB_CODE_FAILED; + int32_t res = TSDB_CODE_FAILED; + + SUpdateKey updateKey = { + .tbUid = tableId, + .ts = ts, + }; + TSKEY *pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t)); uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets; TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index); if (ts < maxTs - pInfo->watermark) { // this window has been closed. if (pInfo->pCloseWinSBF) { - res = tScalableBfPut(pInfo->pCloseWinSBF, &ts, sizeof(TSKEY)); + res = tScalableBfPut(pInfo->pCloseWinSBF, &updateKey, sizeof(SUpdateKey)); if (res == TSDB_CODE_SUCCESS) { return false; } else { @@ -183,7 +211,7 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) { SScalableBf *pSBf = getSBf(pInfo, ts); // pSBf may be a null pointer if (pSBf) { - res = tScalableBfPut(pSBf, &ts, sizeof(TSKEY)); + res = tScalableBfPut(pSBf, &updateKey, sizeof(SUpdateKey)); } int32_t size = taosHashGetSize(pInfo->pMap); diff --git a/source/util/src/tbloomfilter.c b/source/util/src/tbloomfilter.c index f3ccbb0aac..b9c96dd606 100644 --- a/source/util/src/tbloomfilter.c +++ b/source/util/src/tbloomfilter.c @@ -57,8 +57,10 @@ SBloomFilter *tBloomFilterInit(uint64_t expectedEntries, double errorRate) { // ln(2) = 0.693147180559945 pBF->hashFunctions = (uint32_t)ceil(lnRate / 0.693147180559945); - pBF->hashFn1 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP); - pBF->hashFn2 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_NCHAR); + /*pBF->hashFn1 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP);*/ + /*pBF->hashFn2 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_NCHAR);*/ + pBF->hashFn1 = taosFastHash; + pBF->hashFn2 = taosDJB2Hash; pBF->buffer = taosMemoryCalloc(pBF->numUnits, sizeof(uint64_t)); if (pBF->buffer == NULL) { tBloomFilterDestroy(pBF); @@ -144,4 +146,4 @@ _error: return NULL; } -bool tBloomFilterIsFull(const SBloomFilter *pBF) { return pBF->size >= pBF->expectedEntries; } \ No newline at end of file +bool tBloomFilterIsFull(const SBloomFilter *pBF) { return pBF->size >= pBF->expectedEntries; } diff --git a/source/util/src/thashutil.c b/source/util/src/thashutil.c index e646e9aa36..59f7d389c2 100644 --- a/source/util/src/thashutil.c +++ b/source/util/src/thashutil.c @@ -32,6 +32,23 @@ (h) ^= (h) >> 16; \ } while (0) +uint32_t taosFastHash(const char *key, uint32_t len) { + uint32_t result = 0x55555555; + for (uint32_t i = 0; i < len; i++) { + result ^= (uint8_t)key[i]; + result = ROTL32(result, 5); + } + return result; +} + +uint32_t taosDJB2Hash(const char *key, uint32_t len) { + uint32_t hash = 5381; + for (uint32_t i = 0; i < len; i++) { + hash = ((hash << 5) + hash) + (uint8_t)key[i]; /* hash * 33 + c */ + } + return hash; +} + uint32_t MurmurHash3_32(const char *key, uint32_t len) { const uint8_t *data = (const uint8_t *)key; const int32_t nblocks = len >> 2u; -- GitLab