From bcae6e1f4e60bf0f69470e406f3bf5ce082db97c Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 17 Oct 2022 17:18:57 +0800 Subject: [PATCH] refactor(stream): optimize auto create child table --- source/dnode/vnode/src/tq/tqSink.c | 176 +++++++++++++++++------------ 1 file changed, 105 insertions(+), 71 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 76bfa72caa..366cd6268e 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -293,7 +293,6 @@ void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { SSchemaWrapper* pSchemaWrapper = pTask->tbSink.pSchemaWrapper; int32_t blockSz = taosArrayGetSize(pBlocks); - bool createTb = true; SArray* tagArray = taosArrayInit(1, sizeof(STagVal)); if (!tagArray) { @@ -303,6 +302,7 @@ void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { tqDebug("vgId:%d, task %d write into table, block num: %d", TD_VID(pVnode), pTask->taskId, blockSz); for (int32_t i = 0; i < blockSz; i++) { + bool createTb = true; SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); if (pDataBlock->info.type == STREAM_DELETE_RESULT) { SBatchDeleteReq deleteReq = {0}; @@ -337,83 +337,115 @@ void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { tqDebug("failed to put delete req into write-queue since %s", terrstr()); } } else { - SVCreateTbReq createTbReq = {0}; - - // set const - createTbReq.flags = 0; - createTbReq.type = TSDB_CHILD_TABLE; - createTbReq.ctb.suid = suid; - - // set super table name - SName name = {0}; - tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - createTbReq.ctb.stbName = strdup((char*)tNameGetTableName(&name)); // strdup(stbFullName); - - // set tag content - taosArrayClear(tagArray); - STagVal tagVal = { - .cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1, - .type = TSDB_DATA_TYPE_UBIGINT, - .i64 = (int64_t)pDataBlock->info.groupId, - }; - taosArrayPush(tagArray, &tagVal); - createTbReq.ctb.tagNum = taosArrayGetSize(tagArray); - - STag* pTag = NULL; - tTagNew(tagArray, 1, false, &pTag); - if (pTag == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - taosArrayDestroy(tagArray); - tdDestroySVCreateTbReq(&createTbReq); - return; - } - createTbReq.ctb.pTag = (uint8_t*)pTag; - - // set tag name - SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); - char tagNameStr[TSDB_COL_NAME_LEN] = {0}; - strcpy(tagNameStr, "group_id"); - taosArrayPush(tagName, tagNameStr); - createTbReq.ctb.tagName = tagName; - - // set table name + char* ctbName = NULL; + // set child table name if (pDataBlock->info.parTbName[0]) { - createTbReq.name = strdup(pDataBlock->info.parTbName); + ctbName = strdup(pDataBlock->info.parTbName); } else { - createTbReq.name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId); + ctbName = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId); } - int32_t schemaLen; - int32_t code; - tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code); - if (code < 0) { - tdDestroySVCreateTbReq(&createTbReq); - taosArrayDestroy(tagArray); - return; - } + int32_t schemaLen = 0; + void* schemaStr = NULL; + + int64_t uid = 0; + SMetaReader mr = {0}; + metaReaderInit(&mr, pVnode->pMeta, 0); + if (metaGetTableEntryByName(&mr, ctbName) < 0) { + tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), ctbName); + + SVCreateTbReq createTbReq = {0}; + + // set const + createTbReq.flags = 0; + createTbReq.type = TSDB_CHILD_TABLE; + createTbReq.ctb.suid = suid; + + // set super table name + SName name = {0}; + tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + createTbReq.ctb.stbName = strdup((char*)tNameGetTableName(&name)); // strdup(stbFullName); + createTbReq.name = ctbName; + ctbName = NULL; + + // set tag content + taosArrayClear(tagArray); + STagVal tagVal = { + .cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1, + .type = TSDB_DATA_TYPE_UBIGINT, + .i64 = (int64_t)pDataBlock->info.groupId, + }; + taosArrayPush(tagArray, &tagVal); + createTbReq.ctb.tagNum = taosArrayGetSize(tagArray); + + STag* pTag = NULL; + tTagNew(tagArray, 1, false, &pTag); + if (pTag == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + taosArrayDestroy(tagArray); + tdDestroySVCreateTbReq(&createTbReq); + return; + } + createTbReq.ctb.pTag = (uint8_t*)pTag; + + // set tag name + SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); + char tagNameStr[TSDB_COL_NAME_LEN] = {0}; + strcpy(tagNameStr, "group_id"); + taosArrayPush(tagName, tagNameStr); + createTbReq.ctb.tagName = tagName; + + int32_t code; + tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code); + if (code < 0) { + tdDestroySVCreateTbReq(&createTbReq); + taosArrayDestroy(tagArray); + return; + } - // save schema str - void* schemaStr = taosMemoryMalloc(schemaLen); - if (schemaStr == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tdDestroySVCreateTbReq(&createTbReq); - taosArrayDestroy(tagArray); - return; - } + // set schema str + schemaStr = taosMemoryMalloc(schemaLen); + if (schemaStr == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tdDestroySVCreateTbReq(&createTbReq); + taosArrayDestroy(tagArray); + return; + } - SEncoder encoder = {0}; - tEncoderInit(&encoder, schemaStr, schemaLen); - code = tEncodeSVCreateTbReq(&encoder, &createTbReq); - if (code < 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tdDestroySVCreateTbReq(&createTbReq); - taosArrayDestroy(tagArray); + SEncoder encoder = {0}; + tEncoderInit(&encoder, schemaStr, schemaLen); + code = tEncodeSVCreateTbReq(&encoder, &createTbReq); + if (code < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tdDestroySVCreateTbReq(&createTbReq); + taosArrayDestroy(tagArray); + tEncoderClear(&encoder); + taosMemoryFree(schemaStr); + return; + } tEncoderClear(&encoder); - taosMemoryFree(schemaStr); - return; + tdDestroySVCreateTbReq(&createTbReq); + } else { + if (mr.me.type != TSDB_CHILD_TABLE) { + tqError("vgId:%d, failed to write into %s, since table type incorrect, type %d", TD_VID(pVnode), ctbName, + mr.me.type); + metaReaderClear(&mr); + taosMemoryFree(ctbName); + continue; + } + if (mr.me.ctbEntry.suid != suid) { + tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid: %ld, actual suid %ld", + TD_VID(pVnode), ctbName, suid, mr.me.ctbEntry); + metaReaderClear(&mr); + taosMemoryFree(ctbName); + continue; + } + + createTb = false; + uid = mr.me.uid; + metaReaderClear(&mr); + taosMemoryFreeClear(ctbName); } - tEncoderClear(&encoder); - tdDestroySVCreateTbReq(&createTbReq); int32_t cap = sizeof(SSubmitReq); @@ -445,9 +477,11 @@ void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { memcpy(blkSchema, schemaStr, schemaLen); blkHead->schemaLen = htonl(schemaLen); rowData = POINTER_SHIFT(blkSchema, schemaLen); + } else { + blkHead->uid = uid; } - taosMemoryFree(schemaStr); + taosMemoryFreeClear(schemaStr); for (int32_t j = 0; j < rows; j++) { SRowBuilder rb = {0}; -- GitLab