From e7fa3b0af007d23ef306cd2437f806001dade505 Mon Sep 17 00:00:00 2001 From: zhihaop Date: Fri, 23 Sep 2022 23:34:30 +0800 Subject: [PATCH] fix: pInsertParam->numOfRows is incorrect after sqlobjs merge --- src/client/src/tscUtil.c | 65 +++++++++++++++++++++++----------------- 1 file changed, 37 insertions(+), 28 deletions(-) diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index f1c8a6b622..83e5804a9f 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -2294,9 +2294,10 @@ static int32_t compareSMemRow(const void *x, const void *y) { * * @param builder the SSubmitBlkBuilder. * @param target the target to write. + * @param nRows the number of rows in SSubmitBlk*. * @return the writen bytes. */ -static size_t writeSSubmitBlkBuilder(SSubmitBlkBuilder* builder, SSubmitBlk* target) { +static size_t writeSSubmitBlkBuilder(SSubmitBlkBuilder* builder, SSubmitBlk* target, size_t* nRows) { memcpy(target, builder->metadata, sizeof(SSubmitBlk)); uint32_t dataLen = 0; @@ -2307,9 +2308,11 @@ static size_t writeSSubmitBlkBuilder(SSubmitBlkBuilder* builder, SSubmitBlk* tar dataLen += memRowTLen(pRow); } + *nRows = taosArrayGetSize(builder->rows); + target->schemaLen = 0; - target->dataLen = htonl(dataLen); - target->numOfRows = htons(taosArrayGetSize(builder->rows)); + target->dataLen = (int32_t) htonl(dataLen); + target->numOfRows = (int16_t) htons(*nRows); return dataLen + sizeof(SSubmitBlk); } @@ -2323,7 +2326,7 @@ static size_t writeSSubmitBlkBuilder(SSubmitBlkBuilder* builder, SSubmitBlk* tar static size_t writenSizeSSubmitBlkBuilder(SSubmitBlkBuilder* builder) { size_t dataLen = 0; for (int i = 0; i < taosArrayGetSize(builder->rows); ++i) { - char* pRow = *(char**) (taosArrayGet(builder->rows, i)); + char* pRow = taosArrayGetP(builder->rows, i); dataLen += memRowTLen(pRow); } return dataLen + sizeof(SSubmitBlk); @@ -2381,18 +2384,21 @@ static size_t writenSizeSSubmitMsgBuilder(SSubmitMsgBlocksBuilder* builder) { * * @param builder the SSubmitBlkBuilder. * @param pBlocks the target to write. + * @param nRows the number of row in SSubmitMsg::blocks. * @return the writen bytes. */ -size_t writeSSubmitMsgBlocksBuilder(SSubmitMsgBlocksBuilder* builder, SSubmitBlk* pBlocks) { +size_t writeSSubmitMsgBlocksBuilder(SSubmitMsgBlocksBuilder* builder, SSubmitBlk* pBlocks, size_t* nRows) { size_t nWrite = 0; SSubmitBlkBuilder** iter = taosHashIterate(builder->blockBuilders, NULL); + while (iter) { + size_t nSubRows = 0; SSubmitBlkBuilder* blocksBuilder = *iter; SSubmitBlk* pBlock = POINTER_SHIFT(pBlocks, nWrite); - nWrite += writeSSubmitBlkBuilder(blocksBuilder, pBlock); + nWrite += writeSSubmitBlkBuilder(blocksBuilder, pBlock, &nSubRows); + *nRows += nSubRows; iter = taosHashIterate(builder->blockBuilders, iter); } - return nWrite; } @@ -2401,7 +2407,7 @@ size_t writeSSubmitMsgBlocksBuilder(SSubmitMsgBlocksBuilder* builder, SSubmitBlk * @param builder the SSubmitMsgBlocksBuilder. * @return the number of SSubmitBlk block. */ -size_t blockSizeSSubmitMsgBlocksBuilder(SSubmitMsgBlocksBuilder* builder) { +inline static size_t nBlockSSubmitMsgBlocksBuilder(SSubmitMsgBlocksBuilder* builder) { return taosHashGetSize(builder->blockBuilders); } @@ -2456,12 +2462,12 @@ static SSubmitBlkBuilder* computeIfAbsentSSubmitBlkBuilder(SSubmitMsgBlocksBuild * * @param builder the SSubmitMsgBlocksBuilder. * @param pMsg the SSubmitMsg* - * @param numOfBlocks the number of blocks in SSubmitMsg. + * @param nBlocks the number of blocks in SSubmitMsg. * @return whether the append is success. */ -static bool appendSSubmitMsgBlocks(SSubmitMsgBlocksBuilder* builder, SSubmitBlk* pBlocks, size_t numOfBlocks) { +static bool appendSSubmitMsgBlocks(SSubmitMsgBlocksBuilder* builder, SSubmitBlk* pBlocks, size_t nBlocks) { SSubmitBlk* pBlock = pBlocks; - for (size_t i = 0; i < numOfBlocks; ++i) { + for (size_t i = 0; i < nBlocks; ++i) { assert(pBlock->schemaLen == 0); SSubmitBlkBuilder* blocksBuilder = computeIfAbsentSSubmitBlkBuilder(builder, pBlock); if (!blocksBuilder) { @@ -2544,10 +2550,11 @@ static bool appendSTableDataBlocksBuilder(STableDataBlocksBuilder* builder, STab /** * Build the data blocks for single vnode. - * @param builder the STableDataBlocksBuilder. - * @return the data blocks for single vnode. + * @param builder the STableDataBlocksBuilder. + * @param nRows the number of row in STableDataBlocks. + * @return the data blocks for single vnode. */ -static STableDataBlocks* buildSTableDataBlocksBuilder(STableDataBlocksBuilder* builder) { +static STableDataBlocks* buildSTableDataBlocksBuilder(STableDataBlocksBuilder* builder, size_t* nRows) { SSubmitMsgBlocksBuilder* blocksBuilder = builder->blocksBuilder; STableDataBlocks *firstBlock = builder->firstBlock; if (!firstBlock) { @@ -2559,15 +2566,15 @@ static STableDataBlocks* buildSTableDataBlocksBuilder(STableDataBlocksBuilder* b size_t nAllocSize = nWriteSize + nHeaderSize; STableDataBlocks* dataBlocks = NULL; - int32_t code = tscCreateDataBlock(nAllocSize, 0, nHeaderSize, &firstBlock->tableName, firstBlock->pTableMeta, &dataBlocks); + int32_t code = tscCreateDataBlock(nAllocSize, 0, (int32_t) nHeaderSize, &firstBlock->tableName, firstBlock->pTableMeta, &dataBlocks); if (code != TSDB_CODE_SUCCESS) { return NULL; } dataBlocks->size = nHeaderSize; memcpy(dataBlocks->pData, firstBlock->pData, nHeaderSize); - dataBlocks->size += writeSSubmitMsgBlocksBuilder(blocksBuilder, (SSubmitBlk *) (dataBlocks->pData + nHeaderSize)); - dataBlocks->numOfTables = blockSizeSSubmitMsgBlocksBuilder(blocksBuilder); + dataBlocks->size += writeSSubmitMsgBlocksBuilder(blocksBuilder, (SSubmitBlk *) (dataBlocks->pData + nHeaderSize), nRows); + dataBlocks->numOfTables = (int32_t) nBlockSSubmitMsgBlocksBuilder(blocksBuilder); return dataBlocks; } @@ -2647,11 +2654,12 @@ static bool appendSTableDataBlocksListBuilder(STableDataBlocksListBuilder* build /** * Build the vnode data blocks list. * - * @param builder the STableDataBlocksListBuilder. - * @param numOfTables the number of tables. - * @return the vnode data blocks list. + * @param builder the STableDataBlocksListBuilder. + * @param nTables the number of table in vnode data blocks list. + * @param nRows the number of row in vnode data blocks list. + * @return the vnode data blocks list. */ -static SArray* buildSTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder, size_t* numOfTables) { +static SArray* buildSTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder, size_t* nTables, size_t* nRows) { SArray* pVnodeDataBlockList = taosArrayInit(taosHashGetSize(builder->dataBlocksBuilders), sizeof(STableDataBlocks*)); if (!pVnodeDataBlockList) { return NULL; @@ -2659,12 +2667,14 @@ static SArray* buildSTableDataBlocksListBuilder(STableDataBlocksListBuilder* bui STableDataBlocksBuilder** iter = taosHashIterate(builder->dataBlocksBuilders, NULL); while (iter) { + size_t nSubRows = 0; STableDataBlocksBuilder* dataBlocksBuilder = *iter; - STableDataBlocks* dataBlocks = buildSTableDataBlocksBuilder(dataBlocksBuilder); + STableDataBlocks* dataBlocks = buildSTableDataBlocksBuilder(dataBlocksBuilder, &nSubRows); if (!dataBlocks) { goto error; } - *numOfTables += dataBlocks->numOfTables; + *nTables += dataBlocks->numOfTables; + *nRows += nSubRows; taosArrayPush(pVnodeDataBlockList, &dataBlocks); iter = taosHashIterate(builder->dataBlocksBuilders, iter); @@ -2673,7 +2683,7 @@ static SArray* buildSTableDataBlocksListBuilder(STableDataBlocksListBuilder* bui error: for (int i = 0; i < taosArrayGetSize(pVnodeDataBlockList); ++i) { - STableDataBlocks* dataBlocks = *((STableDataBlocks**)taosArrayGet(pVnodeDataBlockList, i)); + STableDataBlocks* dataBlocks = taosArrayGetP(pVnodeDataBlockList, i); tscDestroyDataBlock(NULL, dataBlocks, false); } taosArrayDestroy(&pVnodeDataBlockList); @@ -2850,8 +2860,9 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj *result) { // build the vnode data blocks. size_t numOfBlocks = 0; + size_t numOfRows = 0; SInsertStatementParam* pInsertParam = &result->cmd.insertParam; - SArray* pVnodeDataBlocksList = buildSTableDataBlocksListBuilder(builder, &numOfBlocks); + SArray* pVnodeDataBlocksList = buildSTableDataBlocksListBuilder(builder, &numOfBlocks, &numOfRows); if (!pVnodeDataBlocksList) { destroySTableDataBlocksListBuilder(builder); destroySTableNameListBuilder(nameListBuilder); @@ -2866,9 +2877,6 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj *result) { destroySTableNameListBuilder(nameListBuilder); return TSDB_CODE_TSC_OUT_OF_MEMORY; } - if (numOfTables != numOfBlocks) { - printf("numOfTables=%zu, numOfBlocks=%zu\n", numOfTables, numOfBlocks); - } assert(numOfTables == numOfBlocks); // replace table name list. @@ -2891,6 +2899,7 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj *result) { taosArrayDestroy(&pInsertParam->pDataBlocks); } pInsertParam->pDataBlocks = pVnodeDataBlocksList; + pInsertParam->numOfRows = (int32_t) numOfRows; // clean up. destroySTableDataBlocksListBuilder(builder); -- GitLab