未验证 提交 1c245904 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #17406 from taosdata/feature/stream

refactor(stream): optimize auto create child table
...@@ -293,7 +293,6 @@ void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { ...@@ -293,7 +293,6 @@ void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
SSchemaWrapper* pSchemaWrapper = pTask->tbSink.pSchemaWrapper; SSchemaWrapper* pSchemaWrapper = pTask->tbSink.pSchemaWrapper;
int32_t blockSz = taosArrayGetSize(pBlocks); int32_t blockSz = taosArrayGetSize(pBlocks);
bool createTb = true;
SArray* tagArray = taosArrayInit(1, sizeof(STagVal)); SArray* tagArray = taosArrayInit(1, sizeof(STagVal));
if (!tagArray) { if (!tagArray) {
...@@ -303,6 +302,7 @@ void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { ...@@ -303,6 +302,7 @@ void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
tqDebug("vgId:%d, task %d write into table, block num: %d", TD_VID(pVnode), pTask->taskId, blockSz); tqDebug("vgId:%d, task %d write into table, block num: %d", TD_VID(pVnode), pTask->taskId, blockSz);
for (int32_t i = 0; i < blockSz; i++) { for (int32_t i = 0; i < blockSz; i++) {
bool createTb = true;
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
if (pDataBlock->info.type == STREAM_DELETE_RESULT) { if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
SBatchDeleteReq deleteReq = {0}; SBatchDeleteReq deleteReq = {0};
...@@ -337,6 +337,24 @@ void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { ...@@ -337,6 +337,24 @@ void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
tqDebug("failed to put delete req into write-queue since %s", terrstr()); tqDebug("failed to put delete req into write-queue since %s", terrstr());
} }
} else { } else {
char* ctbName = NULL;
// set child table name
if (pDataBlock->info.parTbName[0]) {
ctbName = strdup(pDataBlock->info.parTbName);
} else {
ctbName = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId);
}
int32_t schemaLen = 0;
void* schemaStr = NULL;
int64_t uid = 0;
SMetaReader mr = {0};
metaReaderInit(&mr, pVnode->pMeta, 0);
if (metaGetTableEntryByName(&mr, ctbName) < 0) {
metaReaderClear(&mr);
tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), ctbName);
SVCreateTbReq createTbReq = {0}; SVCreateTbReq createTbReq = {0};
// set const // set const
...@@ -348,6 +366,8 @@ void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { ...@@ -348,6 +366,8 @@ void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
SName name = {0}; SName name = {0};
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
createTbReq.ctb.stbName = strdup((char*)tNameGetTableName(&name)); // strdup(stbFullName); createTbReq.ctb.stbName = strdup((char*)tNameGetTableName(&name)); // strdup(stbFullName);
createTbReq.name = ctbName;
ctbName = NULL;
// set tag content // set tag content
taosArrayClear(tagArray); taosArrayClear(tagArray);
...@@ -376,14 +396,6 @@ void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { ...@@ -376,14 +396,6 @@ void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
taosArrayPush(tagName, tagNameStr); taosArrayPush(tagName, tagNameStr);
createTbReq.ctb.tagName = tagName; createTbReq.ctb.tagName = tagName;
// set table name
if (pDataBlock->info.parTbName[0]) {
createTbReq.name = strdup(pDataBlock->info.parTbName);
} else {
createTbReq.name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId);
}
int32_t schemaLen;
int32_t code; int32_t code;
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code); tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
if (code < 0) { if (code < 0) {
...@@ -392,8 +404,8 @@ void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { ...@@ -392,8 +404,8 @@ void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
return; return;
} }
// save schema str // set schema str
void* schemaStr = taosMemoryMalloc(schemaLen); schemaStr = taosMemoryMalloc(schemaLen);
if (schemaStr == NULL) { if (schemaStr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
tdDestroySVCreateTbReq(&createTbReq); tdDestroySVCreateTbReq(&createTbReq);
...@@ -414,6 +426,30 @@ void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { ...@@ -414,6 +426,30 @@ void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
} }
tEncoderClear(&encoder); tEncoderClear(&encoder);
tdDestroySVCreateTbReq(&createTbReq); tdDestroySVCreateTbReq(&createTbReq);
} else {
if (mr.me.type != TSDB_CHILD_TABLE) {
tqError("vgId:%d, failed to write into %s, since table type incorrect, type %d", TD_VID(pVnode), ctbName,
mr.me.type);
metaReaderClear(&mr);
taosMemoryFree(ctbName);
continue;
}
if (mr.me.ctbEntry.suid != suid) {
tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid: %ld, actual suid %ld",
TD_VID(pVnode), ctbName, suid, mr.me.ctbEntry);
metaReaderClear(&mr);
taosMemoryFree(ctbName);
continue;
}
createTb = false;
uid = mr.me.uid;
metaReaderClear(&mr);
tqDebug("vgId:%d, stream write, table %s, uid %ld already exist, skip create", TD_VID(pVnode), ctbName, uid);
taosMemoryFreeClear(ctbName);
}
int32_t cap = sizeof(SSubmitReq); int32_t cap = sizeof(SSubmitReq);
...@@ -445,9 +481,11 @@ void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { ...@@ -445,9 +481,11 @@ void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
memcpy(blkSchema, schemaStr, schemaLen); memcpy(blkSchema, schemaStr, schemaLen);
blkHead->schemaLen = htonl(schemaLen); blkHead->schemaLen = htonl(schemaLen);
rowData = POINTER_SHIFT(blkSchema, schemaLen); rowData = POINTER_SHIFT(blkSchema, schemaLen);
} else {
blkHead->uid = htobe64(uid);
} }
taosMemoryFree(schemaStr); taosMemoryFreeClear(schemaStr);
for (int32_t j = 0; j < rows; j++) { for (int32_t j = 0; j < rows; j++) {
SRowBuilder rb = {0}; SRowBuilder rb = {0};
......
...@@ -42,7 +42,7 @@ class TDTestCase: ...@@ -42,7 +42,7 @@ class TDTestCase:
'rowsPerTbl': 1000, 'rowsPerTbl': 1000,
'batchNum': 500, 'batchNum': 500,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3, 'pollDelay': 20,
'showMsg': 1, 'showMsg': 1,
'showRow': 1, 'showRow': 1,
'snapshot': 0} 'snapshot': 0}
...@@ -87,7 +87,7 @@ class TDTestCase: ...@@ -87,7 +87,7 @@ class TDTestCase:
'rowsPerTbl': 1000, 'rowsPerTbl': 1000,
'batchNum': 500, 'batchNum': 500,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 5, 'pollDelay': 20,
'showMsg': 1, 'showMsg': 1,
'showRow': 1, 'showRow': 1,
'snapshot': 0} 'snapshot': 0}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册