diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 76bfa72caa70e90c78be6ce63b1cbadd09df75ec..9bdcf04e890eb59082bfcb932754808829dce8e8 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -293,7 +293,6 @@ void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { SSchemaWrapper* pSchemaWrapper = pTask->tbSink.pSchemaWrapper; int32_t blockSz = taosArrayGetSize(pBlocks); - bool createTb = true; SArray* tagArray = taosArrayInit(1, sizeof(STagVal)); if (!tagArray) { @@ -303,6 +302,7 @@ void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { tqDebug("vgId:%d, task %d write into table, block num: %d", TD_VID(pVnode), pTask->taskId, blockSz); for (int32_t i = 0; i < blockSz; i++) { + bool createTb = true; SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); if (pDataBlock->info.type == STREAM_DELETE_RESULT) { SBatchDeleteReq deleteReq = {0}; @@ -337,83 +337,119 @@ void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { tqDebug("failed to put delete req into write-queue since %s", terrstr()); } } else { - SVCreateTbReq createTbReq = {0}; - - // set const - createTbReq.flags = 0; - createTbReq.type = TSDB_CHILD_TABLE; - createTbReq.ctb.suid = suid; + char* ctbName = NULL; + // set child table name + if (pDataBlock->info.parTbName[0]) { + ctbName = strdup(pDataBlock->info.parTbName); + } else { + ctbName = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId); + } - // set super table name - SName name = {0}; - tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - createTbReq.ctb.stbName = strdup((char*)tNameGetTableName(&name)); // strdup(stbFullName); + int32_t schemaLen = 0; + void* schemaStr = NULL; + + int64_t uid = 0; + SMetaReader mr = {0}; + metaReaderInit(&mr, pVnode->pMeta, 0); + if (metaGetTableEntryByName(&mr, ctbName) < 0) { + metaReaderClear(&mr); + tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), ctbName); + + SVCreateTbReq createTbReq = {0}; + + // set const + createTbReq.flags = 0; + createTbReq.type = TSDB_CHILD_TABLE; + createTbReq.ctb.suid = suid; + + // set super table name + SName name = {0}; + tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + createTbReq.ctb.stbName = strdup((char*)tNameGetTableName(&name)); // strdup(stbFullName); + createTbReq.name = ctbName; + ctbName = NULL; + + // set tag content + taosArrayClear(tagArray); + STagVal tagVal = { + .cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1, + .type = TSDB_DATA_TYPE_UBIGINT, + .i64 = (int64_t)pDataBlock->info.groupId, + }; + taosArrayPush(tagArray, &tagVal); + createTbReq.ctb.tagNum = taosArrayGetSize(tagArray); + + STag* pTag = NULL; + tTagNew(tagArray, 1, false, &pTag); + if (pTag == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + taosArrayDestroy(tagArray); + tdDestroySVCreateTbReq(&createTbReq); + return; + } + createTbReq.ctb.pTag = (uint8_t*)pTag; + + // set tag name + SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); + char tagNameStr[TSDB_COL_NAME_LEN] = {0}; + strcpy(tagNameStr, "group_id"); + taosArrayPush(tagName, tagNameStr); + createTbReq.ctb.tagName = tagName; + + int32_t code; + tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code); + if (code < 0) { + tdDestroySVCreateTbReq(&createTbReq); + taosArrayDestroy(tagArray); + return; + } - // set tag content - taosArrayClear(tagArray); - STagVal tagVal = { - .cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1, - .type = TSDB_DATA_TYPE_UBIGINT, - .i64 = (int64_t)pDataBlock->info.groupId, - }; - taosArrayPush(tagArray, &tagVal); - createTbReq.ctb.tagNum = taosArrayGetSize(tagArray); + // set schema str + schemaStr = taosMemoryMalloc(schemaLen); + if (schemaStr == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tdDestroySVCreateTbReq(&createTbReq); + taosArrayDestroy(tagArray); + return; + } - STag* pTag = NULL; - tTagNew(tagArray, 1, false, &pTag); - if (pTag == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - taosArrayDestroy(tagArray); + SEncoder encoder = {0}; + tEncoderInit(&encoder, schemaStr, schemaLen); + code = tEncodeSVCreateTbReq(&encoder, &createTbReq); + if (code < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tdDestroySVCreateTbReq(&createTbReq); + taosArrayDestroy(tagArray); + tEncoderClear(&encoder); + taosMemoryFree(schemaStr); + return; + } + tEncoderClear(&encoder); tdDestroySVCreateTbReq(&createTbReq); - return; - } - createTbReq.ctb.pTag = (uint8_t*)pTag; - - // set tag name - SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); - char tagNameStr[TSDB_COL_NAME_LEN] = {0}; - strcpy(tagNameStr, "group_id"); - taosArrayPush(tagName, tagNameStr); - createTbReq.ctb.tagName = tagName; - - // set table name - if (pDataBlock->info.parTbName[0]) { - createTbReq.name = strdup(pDataBlock->info.parTbName); } else { - createTbReq.name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId); - } + if (mr.me.type != TSDB_CHILD_TABLE) { + tqError("vgId:%d, failed to write into %s, since table type incorrect, type %d", TD_VID(pVnode), ctbName, + mr.me.type); + metaReaderClear(&mr); + taosMemoryFree(ctbName); + continue; + } + if (mr.me.ctbEntry.suid != suid) { + tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid: %ld, actual suid %ld", + TD_VID(pVnode), ctbName, suid, mr.me.ctbEntry); + metaReaderClear(&mr); + taosMemoryFree(ctbName); + continue; + } - int32_t schemaLen; - int32_t code; - tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code); - if (code < 0) { - tdDestroySVCreateTbReq(&createTbReq); - taosArrayDestroy(tagArray); - return; - } + createTb = false; + uid = mr.me.uid; + metaReaderClear(&mr); - // save schema str - void* schemaStr = taosMemoryMalloc(schemaLen); - if (schemaStr == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tdDestroySVCreateTbReq(&createTbReq); - taosArrayDestroy(tagArray); - return; - } + tqDebug("vgId:%d, stream write, table %s, uid %ld already exist, skip create", TD_VID(pVnode), ctbName, uid); - SEncoder encoder = {0}; - tEncoderInit(&encoder, schemaStr, schemaLen); - code = tEncodeSVCreateTbReq(&encoder, &createTbReq); - if (code < 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tdDestroySVCreateTbReq(&createTbReq); - taosArrayDestroy(tagArray); - tEncoderClear(&encoder); - taosMemoryFree(schemaStr); - return; + taosMemoryFreeClear(ctbName); } - tEncoderClear(&encoder); - tdDestroySVCreateTbReq(&createTbReq); int32_t cap = sizeof(SSubmitReq); @@ -445,9 +481,11 @@ void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { memcpy(blkSchema, schemaStr, schemaLen); blkHead->schemaLen = htonl(schemaLen); rowData = POINTER_SHIFT(blkSchema, schemaLen); + } else { + blkHead->uid = htobe64(uid); } - taosMemoryFree(schemaStr); + taosMemoryFreeClear(schemaStr); for (int32_t j = 0; j < rows; j++) { SRowBuilder rb = {0}; diff --git a/tests/system-test/7-tmq/tmqAutoCreateTbl.py b/tests/system-test/7-tmq/tmqAutoCreateTbl.py index de95fef148aa29316e0a0a967ddf7e2acc787905..587472ed82052de67678529bf5cefed6921597f2 100644 --- a/tests/system-test/7-tmq/tmqAutoCreateTbl.py +++ b/tests/system-test/7-tmq/tmqAutoCreateTbl.py @@ -42,7 +42,7 @@ class TDTestCase: 'rowsPerTbl': 1000, 'batchNum': 500, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 3, + 'pollDelay': 20, 'showMsg': 1, 'showRow': 1, 'snapshot': 0} @@ -87,7 +87,7 @@ class TDTestCase: 'rowsPerTbl': 1000, 'batchNum': 500, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 5, + 'pollDelay': 20, 'showMsg': 1, 'showRow': 1, 'snapshot': 0}