From d96cdff8f39f283c3bdb0c9ac6fb37b727ff545f Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 28 Jan 2023 18:25:43 +0800 Subject: [PATCH] fix:add logic for auto create table while inserting data for taosx --- source/dnode/vnode/inc/vnode.h | 4 +- source/dnode/vnode/src/tq/tqExec.c | 89 ++++++++++++++++--------- source/dnode/vnode/src/tq/tqRead.c | 9 +-- source/libs/executor/src/scanoperator.c | 4 +- 4 files changed, 65 insertions(+), 41 deletions(-) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 3f3e287bb9..33325d41f3 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -266,8 +266,8 @@ int32_t tqReaderSetSubmitReq2(STqReader *pReader, void *msgStr, int32_t msgLen, // int32_t tqReaderSetDataMsg(STqReader *pReader, const SSubmitReq *pMsg, int64_t ver); bool tqNextDataBlock2(STqReader *pReader); bool tqNextDataBlockFilterOut2(STqReader *pReader, SHashObj *filterOutUids); -int32_t tqRetrieveDataBlock2(SSDataBlock *pBlock, STqReader *pReader); -int32_t tqRetrieveTaosxBlock2(STqReader *pReader, SArray *blocks, SArray *schemas); +int32_t tqRetrieveDataBlock2(SSDataBlock *pBlock, STqReader *pReader, SSubmitTbData** pSubmitTbDataRet); +int32_t tqRetrieveTaosxBlock2(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData** pSubmitTbDataRet); // int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader); // int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas); diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index 40a82cc8e8..562e188927 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -241,7 +241,8 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR taosArrayClear(pBlocks); taosArrayClear(pSchemas); - if (tqRetrieveTaosxBlock2(pReader, pBlocks, pSchemas) < 0) { + SSubmitTbData* pSubmitTbDataRet = NULL; + if (tqRetrieveTaosxBlock2(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) { if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; } if (pRsp->withTbName) { @@ -255,23 +256,33 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR continue; } } - if (pHandle->fetchMeta) { -#if 0 - SSubmitBlk* pBlk = pReader->pBlock; - int64_t uid = pExec->pExecReader->lastBlkUid; - int32_t schemaLen = htonl(pBlk->schemaLen); - if (schemaLen > 0) { - if (pRsp->createTableNum == 0) { - pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); - pRsp->createTableReq = taosArrayInit(0, sizeof(void*)); - } - void* createReq = taosMemoryCalloc(1, schemaLen); - memcpy(createReq, pBlk->data, schemaLen); - taosArrayPush(pRsp->createTableLen, &schemaLen); - taosArrayPush(pRsp->createTableReq, &createReq); - pRsp->createTableNum++; + if (pHandle->fetchMeta && pSubmitTbDataRet->pCreateTbReq != NULL) { + if (pRsp->createTableNum == 0) { + pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); + pRsp->createTableReq = taosArrayInit(0, sizeof(void*)); + } + + int32_t code = TSDB_CODE_SUCCESS; + uint32_t len = 0; + tEncodeSize(tEncodeSVCreateTbReq, pSubmitTbDataRet->pCreateTbReq, len, code); + if (TSDB_CODE_SUCCESS != code) { + continue; } -#endif + void* createReq = taosMemoryCalloc(1, len); + SEncoder encoder = {0}; + tEncoderInit(&encoder, createReq, len); + code = tEncodeSVCreateTbReq(&encoder, pSubmitTbDataRet->pCreateTbReq); + if (code < 0) { + tEncoderClear(&encoder); + taosMemoryFree(createReq); + continue; + } + + taosArrayPush(pRsp->createTableLen, &len); + taosArrayPush(pRsp->createTableReq, &createReq); + pRsp->createTableNum++; + + tEncoderClear(&encoder); } for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) { SSDataBlock* pBlock = taosArrayGet(pBlocks, i); @@ -294,7 +305,8 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR /*}*/ taosArrayClear(pBlocks); taosArrayClear(pSchemas); - if (tqRetrieveTaosxBlock2(pReader, pBlocks, pSchemas) < 0) { + SSubmitTbData* pSubmitTbDataRet = NULL; + if (tqRetrieveTaosxBlock2(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) { if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; } if (pRsp->withTbName) { @@ -307,22 +319,33 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR continue; } } - if (pHandle->fetchMeta) { -#if 0 - SSubmitBlk* pBlk = pReader->pBlock; - int32_t schemaLen = htonl(pBlk->schemaLen); - if (schemaLen > 0) { - if (pRsp->createTableNum == 0) { - pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); - pRsp->createTableReq = taosArrayInit(0, sizeof(void*)); - } - void* createReq = taosMemoryCalloc(1, schemaLen); - memcpy(createReq, pBlk->data, schemaLen); - taosArrayPush(pRsp->createTableLen, &schemaLen); - taosArrayPush(pRsp->createTableReq, &createReq); - pRsp->createTableNum++; + if (pHandle->fetchMeta && pSubmitTbDataRet->pCreateTbReq != NULL) { + if (pRsp->createTableNum == 0) { + pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); + pRsp->createTableReq = taosArrayInit(0, sizeof(void*)); + } + + int32_t code = TSDB_CODE_SUCCESS; + uint32_t len = 0; + tEncodeSize(tEncodeSVCreateTbReq, pSubmitTbDataRet->pCreateTbReq, len, code); + if (TSDB_CODE_SUCCESS != code) { + continue; } -#endif + void* createReq = taosMemoryCalloc(1, len); + SEncoder encoder = {0}; + tEncoderInit(&encoder, createReq, len); + code = tEncodeSVCreateTbReq(&encoder, pSubmitTbDataRet->pCreateTbReq); + if (code < 0) { + tEncoderClear(&encoder); + taosMemoryFree(createReq); + continue; + } + + taosArrayPush(pRsp->createTableLen, &len); + taosArrayPush(pRsp->createTableReq, &createReq); + pRsp->createTableNum++; + + tEncoderClear(&encoder); } /*tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock),*/ /*pTq->pVnode->config.tsdbCfg.precision);*/ diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index eb9c0c3eeb..e6d81f5b10 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -332,7 +332,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { while (tqNextDataBlock2(pReader)) { // TODO mem free memset(&ret->data, 0, sizeof(SSDataBlock)); - int32_t code = tqRetrieveDataBlock2(&ret->data, pReader); + int32_t code = tqRetrieveDataBlock2(&ret->data, pReader, NULL); if (code != 0 || ret->data.info.rows == 0) { continue; } @@ -550,7 +550,7 @@ int32_t tqScanSubmitSplit(SArray* pBlocks, SArray* schemas, STqReader* pReader) } #endif -int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader) { +int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbData** pSubmitTbDataRet) { int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData); ASSERT(pReader->nextBlk < blockSz); @@ -559,6 +559,7 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader) { SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); pReader->nextBlk++; + if(pSubmitTbDataRet) *pSubmitTbDataRet = pSubmitTbData; int32_t sversion = pSubmitTbData->sver; int64_t suid = pSubmitTbData->suid; int64_t uid = pSubmitTbData->uid; @@ -1006,9 +1007,9 @@ FAIL: } #endif -int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schemas) { +int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet) { SSDataBlock block = {0}; - if (tqRetrieveDataBlock2(&block, pReader) == 0) { + if (tqRetrieveDataBlock2(&block, pReader, pSubmitTbDataRet) == 0) { taosArrayPush(blocks, &block); SSchemaWrapper* pSW = tCloneSSchemaWrapper(pReader->pSchemaWrapper); taosArrayPush(schemas, &pSW); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 37c33c44e2..7529a5f8f7 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1532,7 +1532,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { while (tqNextDataBlock2(pInfo->tqReader)) { SSDataBlock block = {0}; - int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader); + int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader, NULL); if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) { continue; @@ -1941,7 +1941,7 @@ FETCH_NEXT_BLOCK: while (tqNextDataBlock2(pInfo->tqReader)) { SSDataBlock block = {0}; - int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader); + int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader, NULL); if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) { continue; -- GitLab