提交 23f5f7f6 编写于 作者: H Haojun Liao

Merge remote-tracking branch 'origin/enh/rocksdbSstate' into enh/rocksdbSstate

...@@ -239,6 +239,7 @@ typedef struct { ...@@ -239,6 +239,7 @@ typedef struct {
void* vnode; // not available to encoder and decoder void* vnode; // not available to encoder and decoder
FTbSink* tbSinkFunc; FTbSink* tbSinkFunc;
STSchema* pTSchema; STSchema* pTSchema;
SSHashObj* pTblInfo;
} STaskSinkTb; } STaskSinkTb;
typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data); typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data);
......
...@@ -572,6 +572,10 @@ end: ...@@ -572,6 +572,10 @@ end:
return ret; return ret;
} }
void freePtr(void *ptr) {
taosMemoryFree(*(void**)ptr);
}
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId); pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
...@@ -646,6 +650,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { ...@@ -646,6 +650,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
if (pTask->tbSink.pTSchema == NULL) { if (pTask->tbSink.pTSchema == NULL) {
return -1; return -1;
} }
pTask->tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
tSimpleHashSetFreeFp(pTask->tbSink.pTblInfo, freePtr);
} }
if (pTask->taskLevel == TASK_LEVEL__SOURCE) { if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
......
...@@ -17,6 +17,13 @@ ...@@ -17,6 +17,13 @@
#include "tmsg.h" #include "tmsg.h"
#include "tq.h" #include "tq.h"
#define MAX_CATCH_NUM 10240
typedef struct STblInfo {
uint64_t uid;
char tbName[TSDB_TABLE_NAME_LEN];
} STblInfo;
int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
const char* pIdStr) { const char* pIdStr) {
int32_t totalRows = pDataBlock->info.rows; int32_t totalRows = pDataBlock->info.rows;
...@@ -90,6 +97,22 @@ end: ...@@ -90,6 +97,22 @@ end:
return ret; return ret;
} }
int32_t tqGetTableInfo(SSHashObj* tblInfo ,uint64_t groupId, STblInfo** pTbl) {
void* pVal = tSimpleHashGet(tblInfo, &groupId, sizeof(uint64_t));
if (pVal) {
*pTbl = *(STblInfo**)pVal;
return TSDB_CODE_SUCCESS;
}
return TSDB_CODE_FAILED;
}
int32_t tqPutTableInfo(SSHashObj* tblInfo ,uint64_t groupId, STblInfo* pTbl) {
if (tSimpleHashGetSize(tblInfo) > MAX_CATCH_NUM) {
return TSDB_CODE_SUCCESS;
}
return tSimpleHashPut(tblInfo, &groupId, sizeof(uint64_t), &pTbl, POINTER_BYTES);
}
int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) { int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) {
void* buf = NULL; void* buf = NULL;
int32_t tlen = 0; int32_t tlen = 0;
...@@ -260,100 +283,112 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d ...@@ -260,100 +283,112 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
tbData.suid = suid; tbData.suid = suid;
tbData.uid = 0; // uid is assigned by vnode tbData.uid = 0; // uid is assigned by vnode
tbData.sver = pTSchema->version; tbData.sver = pTSchema->version;
STblInfo* pTblMeta = NULL;
char* ctbName = NULL; int32_t res = tqGetTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, &pTblMeta);
tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), pDataBlock->info.parTbName); if (res != TSDB_CODE_SUCCESS) {
if (pDataBlock->info.parTbName[0]) { pTblMeta = taosMemoryCalloc(1, sizeof(STblInfo));
ctbName = taosStrdup(pDataBlock->info.parTbName);
} else {
ctbName = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId);
} }
SMetaReader mr = {0}; char* ctbName = pDataBlock->info.parTbName;
metaReaderInit(&mr, pVnode->pMeta, 0); if (!ctbName[0]) {
if (metaGetTableEntryByName(&mr, ctbName) < 0) { if (res == TSDB_CODE_SUCCESS) {
metaReaderClear(&mr); memcpy(ctbName, pTblMeta->tbName, strlen(pTblMeta->tbName));
tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), ctbName); } else {
char* tmp = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId);
memcpy(ctbName, tmp, strlen(tmp));
memcpy(pTblMeta->tbName, tmp, strlen(tmp));
taosMemoryFree(tmp);
tqDebug("vgId:%d, gropuid:%" PRIu64 " datablock tabel name is null", TD_VID(pVnode),
pDataBlock->info.id.groupId);
}
}
SVCreateTbReq* pCreateTbReq = NULL; if (res == TSDB_CODE_SUCCESS) {
tbData.uid = pTblMeta->uid;
} else {
SMetaReader mr = {0};
metaReaderInit(&mr, pVnode->pMeta, 0);
if (metaGetTableEntryByName(&mr, ctbName) < 0) {
metaReaderClear(&mr);
taosMemoryFree(pTblMeta);
tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), ctbName);
if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) { SVCreateTbReq* pCreateTbReq = NULL;
taosMemoryFree(ctbName);
goto _end;
};
// set const if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) {
pCreateTbReq->flags = 0; goto _end;
pCreateTbReq->type = TSDB_CHILD_TABLE; };
pCreateTbReq->ctb.suid = suid;
// set super table name // set const
SName name = {0}; pCreateTbReq->flags = 0;
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); pCreateTbReq->type = TSDB_CHILD_TABLE;
pCreateTbReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name)); // taosStrdup(stbFullName); pCreateTbReq->ctb.suid = suid;
// set tag content // set super table name
tagArray = taosArrayInit(1, sizeof(STagVal)); SName name = {0};
if (!tagArray) { tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
taosMemoryFree(ctbName); pCreateTbReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name)); // taosStrdup(stbFullName);
tdDestroySVCreateTbReq(pCreateTbReq);
goto _end;
}
STagVal tagVal = {
.cid = pTSchema->numOfCols + 1,
.type = TSDB_DATA_TYPE_UBIGINT,
.i64 = (int64_t)pDataBlock->info.id.groupId,
};
taosArrayPush(tagArray, &tagVal);
pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray);
STag* pTag = NULL; // set tag content
tTagNew(tagArray, 1, false, &pTag); tagArray = taosArrayInit(1, sizeof(STagVal));
tagArray = taosArrayDestroy(tagArray); if (!tagArray) {
if (pTag == NULL) { tdDestroySVCreateTbReq(pCreateTbReq);
taosMemoryFree(ctbName); goto _end;
tdDestroySVCreateTbReq(pCreateTbReq); }
terrno = TSDB_CODE_OUT_OF_MEMORY; STagVal tagVal = {
taosMemoryFree(ctbName); .cid = pTSchema->numOfCols + 1,
tdDestroySVCreateTbReq(pCreateTbReq); .type = TSDB_DATA_TYPE_UBIGINT,
goto _end; .i64 = (int64_t)pDataBlock->info.id.groupId,
} };
pCreateTbReq->ctb.pTag = (uint8_t*)pTag; taosArrayPush(tagArray, &tagVal);
pCreateTbReq->ctb.tagNum = taosArrayGetSize(tagArray);
// set tag name STag* pTag = NULL;
SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); tTagNew(tagArray, 1, false, &pTag);
char tagNameStr[TSDB_COL_NAME_LEN] = {0}; tagArray = taosArrayDestroy(tagArray);
strcpy(tagNameStr, "group_id"); if (pTag == NULL) {
taosArrayPush(tagName, tagNameStr); tdDestroySVCreateTbReq(pCreateTbReq);
pCreateTbReq->ctb.tagName = tagName; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
pCreateTbReq->ctb.pTag = (uint8_t*)pTag;
// set table name // set tag name
pCreateTbReq->name = ctbName; SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
ctbName = NULL; char tagNameStr[TSDB_COL_NAME_LEN] = {0};
strcpy(tagNameStr, "group_id");
taosArrayPush(tagName, tagNameStr);
pCreateTbReq->ctb.tagName = tagName;
tbData.pCreateTbReq = pCreateTbReq; // set table name
tbData.flags = SUBMIT_REQ_AUTO_CREATE_TABLE; pCreateTbReq->name = taosStrdup(ctbName);
} else {
if (mr.me.type != TSDB_CHILD_TABLE) { tbData.pCreateTbReq = pCreateTbReq;
tqError("vgId:%d, failed to write into %s, since table type incorrect, type %d", TD_VID(pVnode), ctbName, tbData.flags = SUBMIT_REQ_AUTO_CREATE_TABLE;
mr.me.type); } else {
metaReaderClear(&mr); if (mr.me.type != TSDB_CHILD_TABLE) {
taosMemoryFree(ctbName); tqError("vgId:%d, failed to write into %s, since table type incorrect, type %d", TD_VID(pVnode), ctbName,
continue; mr.me.type);
} metaReaderClear(&mr);
taosMemoryFree(pTblMeta);
continue;
}
if (mr.me.ctbEntry.suid != suid) {
tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid: %" PRId64
", actual suid %" PRId64 "",
TD_VID(pVnode), ctbName, suid, mr.me.ctbEntry.suid);
metaReaderClear(&mr);
taosMemoryFree(pTblMeta);
continue;
}
if (mr.me.ctbEntry.suid != suid) { tbData.uid = mr.me.uid;
tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid: %" PRId64 pTblMeta->uid = mr.me.uid;
", actual suid %" PRId64 "", tqPutTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, pTblMeta);
TD_VID(pVnode), ctbName, suid, mr.me.ctbEntry.suid);
metaReaderClear(&mr); metaReaderClear(&mr);
taosMemoryFree(ctbName);
continue;
} }
tbData.uid = mr.me.uid;
metaReaderClear(&mr);
taosMemoryFreeClear(ctbName);
} }
// rows // rows
......
...@@ -1583,7 +1583,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock ...@@ -1583,7 +1583,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
// currently only the tbname pseudo column // currently only the tbname pseudo column
if (pInfo->numOfPseudoExpr > 0) { if (pInfo->numOfPseudoExpr > 0) {
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes, int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), NULL); pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), &pTableScanInfo->base.metaCache);
// ignore the table not exists error, since this table may have been dropped during the scan procedure. // ignore the table not exists error, since this table may have been dropped during the scan procedure.
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
blockDataFreeRes((SSDataBlock*)pBlock); blockDataFreeRes((SSDataBlock*)pBlock);
......
...@@ -195,6 +195,7 @@ void tFreeStreamTask(SStreamTask* pTask) { ...@@ -195,6 +195,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
if (pTask->outputType == TASK_OUTPUT__TABLE) { if (pTask->outputType == TASK_OUTPUT__TABLE) {
tDeleteSchemaWrapper(pTask->tbSink.pSchemaWrapper); tDeleteSchemaWrapper(pTask->tbSink.pSchemaWrapper);
taosMemoryFree(pTask->tbSink.pTSchema); taosMemoryFree(pTask->tbSink.pTSchema);
tSimpleHashCleanup(pTask->tbSink.pTblInfo);
} }
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册