提交 1e9c4d5f 编写于 作者: L liuyao

opt stream block dispatch

上级 39b589a5
...@@ -248,6 +248,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pData ...@@ -248,6 +248,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pData
tb_uid_t suid); tb_uid_t suid);
char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId); char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId);
int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf);
static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) { static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
return blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock)) + blockDataGetSize(pBlock); return blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock)) + blockDataGetSize(pBlock);
......
...@@ -327,6 +327,7 @@ struct SStreamTask { ...@@ -327,6 +327,7 @@ struct SStreamTask {
int64_t checkpointingId; int64_t checkpointingId;
int32_t checkpointAlignCnt; int32_t checkpointAlignCnt;
struct SStreamMeta* pMeta; struct SStreamMeta* pMeta;
SSHashObj* pNameMap;
}; };
// meta // meta
......
...@@ -2465,19 +2465,31 @@ _end: ...@@ -2465,19 +2465,31 @@ _end:
} }
char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) { char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) {
if (stbFullName[0] == 0) { char* pBuf = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1);
if (!pBuf) {
return NULL;
}
int32_t code = buildCtbNameByGroupIdImpl(stbFullName, groupId, pBuf);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pBuf);
return NULL; return NULL;
} }
return pBuf;
}
int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, char* cname) {
if (stbFullName[0] == 0) {
return TSDB_CODE_FAILED;
}
SArray* tags = taosArrayInit(0, sizeof(SSmlKv)); SArray* tags = taosArrayInit(0, sizeof(SSmlKv));
if (tags == NULL) { if (tags == NULL) {
return NULL; return TSDB_CODE_FAILED;
} }
void* cname = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1);
if (cname == NULL) { if (cname == NULL) {
taosArrayDestroy(tags); taosArrayDestroy(tags);
return NULL; return TSDB_CODE_FAILED;
} }
SSmlKv pTag = {.key = "group_id", SSmlKv pTag = {.key = "group_id",
...@@ -2499,9 +2511,9 @@ char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) { ...@@ -2499,9 +2511,9 @@ char* buildCtbNameByGroupId(const char* stbFullName, uint64_t groupId) {
taosArrayDestroy(tags); taosArrayDestroy(tags);
if ((rname.ctbShortName && rname.ctbShortName[0]) == 0) { if ((rname.ctbShortName && rname.ctbShortName[0]) == 0) {
return NULL; return TSDB_CODE_FAILED;
} }
return rname.ctbShortName; return TSDB_CODE_SUCCESS;
} }
int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
......
...@@ -298,10 +298,8 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d ...@@ -298,10 +298,8 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
if (res == TSDB_CODE_SUCCESS) { if (res == TSDB_CODE_SUCCESS) {
memcpy(ctbName, pTableSinkInfo->tbName, strlen(pTableSinkInfo->tbName)); memcpy(ctbName, pTableSinkInfo->tbName, strlen(pTableSinkInfo->tbName));
} else { } else {
char* tmp = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId); buildCtbNameByGroupIdImpl(stbFullName, pDataBlock->info.id.groupId, ctbName);
memcpy(ctbName, tmp, strlen(tmp)); memcpy(pTableSinkInfo->tbName, ctbName, strlen(ctbName));
memcpy(pTableSinkInfo->tbName, tmp, strlen(tmp));
taosMemoryFree(tmp);
tqDebug("vgId:%d, gropuId:%" PRIu64 " datablock table name is null", TD_VID(pVnode), tqDebug("vgId:%d, gropuId:%" PRIu64 " datablock table name is null", TD_VID(pVnode),
pDataBlock->info.id.groupId); pDataBlock->info.id.groupId);
} }
......
...@@ -15,6 +15,13 @@ ...@@ -15,6 +15,13 @@
#include "streamInc.h" #include "streamInc.h"
#define MAX_BLOCK_NAME_NUM 1024
typedef struct SBlockName {
uint32_t hashValue;
char parTbName[TSDB_TABLE_NAME_LEN];
} SBlockName;
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) { int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1; if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
...@@ -331,26 +338,46 @@ FAIL: ...@@ -331,26 +338,46 @@ FAIL:
int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz, int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz,
int64_t groupId) { int64_t groupId) {
char* ctbName = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN); uint32_t hashValue = 0;
if (ctbName == NULL) { SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
return -1; if (pTask->pNameMap == NULL) {
pTask->pNameMap = tSimpleHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
} }
if (pDataBlock->info.parTbName[0]) { void* pVal = tSimpleHashGet(pTask->pNameMap, &groupId, sizeof(int64_t));
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName); if (pVal) {
SBlockName* pBln = (SBlockName*)pVal;
hashValue = pBln->hashValue;
if (!pDataBlock->info.parTbName[0]) {
memcpy(pDataBlock->info.parTbName, pBln->parTbName, strlen(pBln->parTbName));
}
} else { } else {
char* ctbShortName = buildCtbNameByGroupId(pTask->shuffleDispatcher.stbFullName, groupId); char* ctbName = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN);
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, ctbShortName); if (ctbName == NULL) {
taosMemoryFree(ctbShortName); return -1;
} }
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; if (pDataBlock->info.parTbName[0]) {
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName);
} else {
buildCtbNameByGroupIdImpl(pTask->shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName);
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName);
}
/*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/ SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
SUseDbRsp* pDbInfo = &pTask->shuffleDispatcher.dbInfo;
uint32_t hashValue = /*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/
taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix); SUseDbRsp* pDbInfo = &pTask->shuffleDispatcher.dbInfo;
taosMemoryFree(ctbName); hashValue =
taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix);
taosMemoryFree(ctbName);
SBlockName bln = {0};
bln.hashValue = hashValue;
memcpy(bln.parTbName, pDataBlock->info.parTbName, strlen(pDataBlock->info.parTbName));
if (tSimpleHashGetSize(pTask->pNameMap) < MAX_BLOCK_NAME_NUM) {
tSimpleHashPut(pTask->pNameMap, &groupId, sizeof(int64_t), &bln, sizeof(SBlockName));
}
}
bool found = false; bool found = false;
// TODO: optimize search // TODO: optimize search
......
...@@ -224,5 +224,9 @@ void tFreeStreamTask(SStreamTask* pTask) { ...@@ -224,5 +224,9 @@ void tFreeStreamTask(SStreamTask* pTask) {
taosMemoryFree((void*)pTask->id.idStr); taosMemoryFree((void*)pTask->id.idStr);
} }
if (pTask->pNameMap) {
tSimpleHashCleanup(pTask->pNameMap);
}
taosMemoryFree(pTask); taosMemoryFree(pTask);
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册