diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 53fc07c3f368c769ce01242f39220a85da009458..6cb7d8852310082e0f1431c265c01752f5d527b7 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -248,6 +248,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pData tb_uid_t suid); char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId); +int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf); static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) { return blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock)) + blockDataGetSize(pBlock); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 8316e6ef50eb6990bc08697a35aabac9e7175a48..51f2de481d120cfb6a044565fb0a482297e8660a 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -327,6 +327,7 @@ struct SStreamTask { int64_t checkpointingId; int32_t checkpointAlignCnt; struct SStreamMeta* pMeta; + SSHashObj* pNameMap; }; // meta diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 24e978b0ea10f19ddbfdec5756d81e0fd35626a8..033fbb0ef164551f24e95be6e40b0844d763f2e9 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2465,19 +2465,31 @@ _end: } char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) { - if (stbFullName[0] == 0) { + char* pBuf = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1); + if (!pBuf) { + return NULL; + } + int32_t code = buildCtbNameByGroupIdImpl(stbFullName, groupId, pBuf); + if (code != TSDB_CODE_SUCCESS) { + taosMemoryFree(pBuf); return NULL; } + return pBuf; +} + +int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, char* cname) { + if (stbFullName[0] == 0) { + return TSDB_CODE_FAILED; + } SArray* tags = taosArrayInit(0, sizeof(SSmlKv)); if (tags == NULL) { - return NULL; + return TSDB_CODE_FAILED; } - void* cname = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1); if (cname == NULL) { taosArrayDestroy(tags); - return NULL; + return TSDB_CODE_FAILED; } SSmlKv pTag = {.key = "group_id", @@ -2499,9 +2511,9 @@ char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) { taosArrayDestroy(tags); if ((rname.ctbShortName && rname.ctbShortName[0]) == 0) { - return NULL; + return TSDB_CODE_FAILED; } - return rname.ctbShortName; + return TSDB_CODE_SUCCESS; } int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index db1b5ed9029dc53c7907d2322cf1f9c648268b2a..9349c6eb0dd9f36073aceb7d76d769df9871e92a 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -298,10 +298,8 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d if (res == TSDB_CODE_SUCCESS) { memcpy(ctbName, pTableSinkInfo->tbName, strlen(pTableSinkInfo->tbName)); } else { - char* tmp = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId); - memcpy(ctbName, tmp, strlen(tmp)); - memcpy(pTableSinkInfo->tbName, tmp, strlen(tmp)); - taosMemoryFree(tmp); + buildCtbNameByGroupIdImpl(stbFullName, pDataBlock->info.id.groupId, ctbName); + memcpy(pTableSinkInfo->tbName, ctbName, strlen(ctbName)); tqDebug("vgId:%d, gropuId:%" PRIu64 " datablock table name is null", TD_VID(pVnode), pDataBlock->info.id.groupId); } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 9cb0a5664493ab444c1e81d778f700c53cbcefdc..922a1f534592dd11a1af7f34592a8251f178ec7b 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -15,6 +15,13 @@ #include "streamInc.h" +#define MAX_BLOCK_NAME_NUM 1024 + +typedef struct SBlockName { + uint32_t hashValue; + char parTbName[TSDB_TABLE_NAME_LEN]; +} SBlockName; + int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; @@ -331,26 +338,46 @@ FAIL: int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz, int64_t groupId) { - char* ctbName = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN); - if (ctbName == NULL) { - return -1; + uint32_t hashValue = 0; + SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + if (pTask->pNameMap == NULL) { + pTask->pNameMap = tSimpleHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); } - if (pDataBlock->info.parTbName[0]) { - snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName); + void* pVal = tSimpleHashGet(pTask->pNameMap, &groupId, sizeof(int64_t)); + if (pVal) { + SBlockName* pBln = (SBlockName*)pVal; + hashValue = pBln->hashValue; + if (!pDataBlock->info.parTbName[0]) { + memcpy(pDataBlock->info.parTbName, pBln->parTbName, strlen(pBln->parTbName)); + } } else { - char* ctbShortName = buildCtbNameByGroupId(pTask->shuffleDispatcher.stbFullName, groupId); - snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, ctbShortName); - taosMemoryFree(ctbShortName); - } + char* ctbName = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN); + if (ctbName == NULL) { + return -1; + } - SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + if (pDataBlock->info.parTbName[0]) { + snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName); + } else { + buildCtbNameByGroupIdImpl(pTask->shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName); + snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName); + } - /*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/ - SUseDbRsp* pDbInfo = &pTask->shuffleDispatcher.dbInfo; - uint32_t hashValue = - taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix); - taosMemoryFree(ctbName); + SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + + /*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/ + SUseDbRsp* pDbInfo = &pTask->shuffleDispatcher.dbInfo; + hashValue = + taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix); + taosMemoryFree(ctbName); + SBlockName bln = {0}; + bln.hashValue = hashValue; + memcpy(bln.parTbName, pDataBlock->info.parTbName, strlen(pDataBlock->info.parTbName)); + if (tSimpleHashGetSize(pTask->pNameMap) < MAX_BLOCK_NAME_NUM) { + tSimpleHashPut(pTask->pNameMap, &groupId, sizeof(int64_t), &bln, sizeof(SBlockName)); + } + } bool found = false; // TODO: optimize search diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index a0caffd41fc33c16de5e3b6cbe078d8b14cad94e..284d1ecab63423a7b52e80c23d197639d36b6844 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -224,5 +224,9 @@ void tFreeStreamTask(SStreamTask* pTask) { taosMemoryFree((void*)pTask->id.idStr); } + if (pTask->pNameMap) { + tSimpleHashCleanup(pTask->pNameMap); + } + taosMemoryFree(pTask); }