提交 8b7b8046 编写于 作者: L liuyao

opt partition by tbname

上级 8277d608
...@@ -239,6 +239,7 @@ typedef struct { ...@@ -239,6 +239,7 @@ typedef struct {
void* vnode; // not available to encoder and decoder void* vnode; // not available to encoder and decoder
FTbSink* tbSinkFunc; FTbSink* tbSinkFunc;
STSchema* pTSchema; STSchema* pTSchema;
SSHashObj* pTblInfo;
} STaskSinkTb; } STaskSinkTb;
typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data); typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data);
......
...@@ -572,6 +572,10 @@ end: ...@@ -572,6 +572,10 @@ end:
return ret; return ret;
} }
void freePtr(void *ptr) {
taosMemoryFree(*(void**)ptr);
}
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId); pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
...@@ -646,6 +650,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { ...@@ -646,6 +650,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
if (pTask->tbSink.pTSchema == NULL) { if (pTask->tbSink.pTSchema == NULL) {
return -1; return -1;
} }
pTask->tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
tSimpleHashSetFreeFp(pTask->tbSink.pTblInfo, freePtr);
} }
if (pTask->taskLevel == TASK_LEVEL__SOURCE) { if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
......
...@@ -17,6 +17,13 @@ ...@@ -17,6 +17,13 @@
#include "tmsg.h" #include "tmsg.h"
#include "tq.h" #include "tq.h"
#define MAX_CATCH_NUM 10240
typedef struct STblInfo {
uint64_t uid;
char tbName[TSDB_TABLE_NAME_LEN];
} STblInfo;
int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
const char* pIdStr) { const char* pIdStr) {
int32_t totalRows = pDataBlock->info.rows; int32_t totalRows = pDataBlock->info.rows;
...@@ -90,6 +97,22 @@ end: ...@@ -90,6 +97,22 @@ end:
return ret; return ret;
} }
int32_t tqGetTableInfo(SSHashObj* tblInfo ,uint64_t groupId, STblInfo** pTbl) {
void* pVal = tSimpleHashGet(tblInfo, &groupId, sizeof(uint64_t));
if (pVal) {
*pTbl = *(STblInfo**)pVal;
return TSDB_CODE_SUCCESS;
}
return TSDB_CODE_FAILED;
}
int32_t tqPutTableInfo(SSHashObj* tblInfo ,uint64_t groupId, STblInfo* pTbl) {
if (tSimpleHashGetSize(tblInfo) > MAX_CATCH_NUM) {
return TSDB_CODE_SUCCESS;
}
return tSimpleHashPut(tblInfo, &groupId, sizeof(uint64_t), &pTbl, POINTER_BYTES);
}
int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) { int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) {
void* buf = NULL; void* buf = NULL;
int32_t tlen = 0; int32_t tlen = 0;
...@@ -260,25 +283,40 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d ...@@ -260,25 +283,40 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
tbData.suid = suid; tbData.suid = suid;
tbData.uid = 0; // uid is assigned by vnode tbData.uid = 0; // uid is assigned by vnode
tbData.sver = pTSchema->version; tbData.sver = pTSchema->version;
STblInfo* pTblMeta = NULL;
int32_t res = tqGetTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, &pTblMeta);
if (res != TSDB_CODE_SUCCESS) {
pTblMeta = taosMemoryCalloc(1, sizeof(STblInfo));
}
char* ctbName = NULL; char* ctbName = pDataBlock->info.parTbName;
tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), pDataBlock->info.parTbName); if (!ctbName[0]) {
if (pDataBlock->info.parTbName[0]) { if (res == TSDB_CODE_SUCCESS) {
ctbName = taosStrdup(pDataBlock->info.parTbName); memcpy(ctbName, pTblMeta->tbName, strlen(pTblMeta->tbName));
} else { } else {
ctbName = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId); char* tmp = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId);
memcpy(ctbName, tmp, strlen(tmp));
memcpy(pTblMeta->tbName, tmp, strlen(tmp));
taosMemoryFree(tmp);
tqDebug("vgId:%d, gropuid:%" PRIu64 " datablock tabel name is null", TD_VID(pVnode),
pDataBlock->info.id.groupId);
}
} }
if (res == TSDB_CODE_SUCCESS) {
tbData.uid = pTblMeta->uid;
} else {
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderInit(&mr, pVnode->pMeta, 0); metaReaderInit(&mr, pVnode->pMeta, 0);
if (metaGetTableEntryByName(&mr, ctbName) < 0) { if (metaGetTableEntryByName(&mr, ctbName) < 0) {
metaReaderClear(&mr); metaReaderClear(&mr);
taosMemoryFree(pTblMeta);
tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), ctbName); tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), ctbName);
SVCreateTbReq* pCreateTbReq = NULL; SVCreateTbReq* pCreateTbReq = NULL;
if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) { if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) {
taosMemoryFree(ctbName);
goto _end; goto _end;
}; };
...@@ -295,7 +333,6 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d ...@@ -295,7 +333,6 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
// set tag content // set tag content
tagArray = taosArrayInit(1, sizeof(STagVal)); tagArray = taosArrayInit(1, sizeof(STagVal));
if (!tagArray) { if (!tagArray) {
taosMemoryFree(ctbName);
tdDestroySVCreateTbReq(pCreateTbReq); tdDestroySVCreateTbReq(pCreateTbReq);
goto _end; goto _end;
} }
...@@ -311,11 +348,8 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d ...@@ -311,11 +348,8 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
tTagNew(tagArray, 1, false, &pTag); tTagNew(tagArray, 1, false, &pTag);
tagArray = taosArrayDestroy(tagArray); tagArray = taosArrayDestroy(tagArray);
if (pTag == NULL) { if (pTag == NULL) {
taosMemoryFree(ctbName);
tdDestroySVCreateTbReq(pCreateTbReq); tdDestroySVCreateTbReq(pCreateTbReq);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(ctbName);
tdDestroySVCreateTbReq(pCreateTbReq);
goto _end; goto _end;
} }
pCreateTbReq->ctb.pTag = (uint8_t*)pTag; pCreateTbReq->ctb.pTag = (uint8_t*)pTag;
...@@ -328,8 +362,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d ...@@ -328,8 +362,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
pCreateTbReq->ctb.tagName = tagName; pCreateTbReq->ctb.tagName = tagName;
// set table name // set table name
pCreateTbReq->name = ctbName; pCreateTbReq->name = taosStrdup(ctbName);
ctbName = NULL;
tbData.pCreateTbReq = pCreateTbReq; tbData.pCreateTbReq = pCreateTbReq;
tbData.flags = SUBMIT_REQ_AUTO_CREATE_TABLE; tbData.flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
...@@ -338,7 +371,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d ...@@ -338,7 +371,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
tqError("vgId:%d, failed to write into %s, since table type incorrect, type %d", TD_VID(pVnode), ctbName, tqError("vgId:%d, failed to write into %s, since table type incorrect, type %d", TD_VID(pVnode), ctbName,
mr.me.type); mr.me.type);
metaReaderClear(&mr); metaReaderClear(&mr);
taosMemoryFree(ctbName); taosMemoryFree(pTblMeta);
continue; continue;
} }
...@@ -347,13 +380,15 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d ...@@ -347,13 +380,15 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
", actual suid %" PRId64 "", ", actual suid %" PRId64 "",
TD_VID(pVnode), ctbName, suid, mr.me.ctbEntry.suid); TD_VID(pVnode), ctbName, suid, mr.me.ctbEntry.suid);
metaReaderClear(&mr); metaReaderClear(&mr);
taosMemoryFree(ctbName); taosMemoryFree(pTblMeta);
continue; continue;
} }
tbData.uid = mr.me.uid; tbData.uid = mr.me.uid;
pTblMeta->uid = mr.me.uid;
tqPutTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, pTblMeta);
metaReaderClear(&mr); metaReaderClear(&mr);
taosMemoryFreeClear(ctbName); }
} }
// rows // rows
......
...@@ -1583,7 +1583,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock ...@@ -1583,7 +1583,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
// currently only the tbname pseudo column // currently only the tbname pseudo column
if (pInfo->numOfPseudoExpr > 0) { if (pInfo->numOfPseudoExpr > 0) {
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes, int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), NULL); pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), &pTableScanInfo->base.metaCache);
// ignore the table not exists error, since this table may have been dropped during the scan procedure. // ignore the table not exists error, since this table may have been dropped during the scan procedure.
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
blockDataFreeRes((SSDataBlock*)pBlock); blockDataFreeRes((SSDataBlock*)pBlock);
......
...@@ -195,6 +195,7 @@ void tFreeStreamTask(SStreamTask* pTask) { ...@@ -195,6 +195,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
if (pTask->outputType == TASK_OUTPUT__TABLE) { if (pTask->outputType == TASK_OUTPUT__TABLE) {
tDeleteSchemaWrapper(pTask->tbSink.pSchemaWrapper); tDeleteSchemaWrapper(pTask->tbSink.pSchemaWrapper);
taosMemoryFree(pTask->tbSink.pTSchema); taosMemoryFree(pTask->tbSink.pTSchema);
tSimpleHashCleanup(pTask->tbSink.pTblInfo);
} }
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册