提交 e7fa3b0a 编写于 作者: Z zhihaop

fix: pInsertParam->numOfRows is incorrect after sqlobjs merge

上级 8832b4b1
...@@ -2294,9 +2294,10 @@ static int32_t compareSMemRow(const void *x, const void *y) { ...@@ -2294,9 +2294,10 @@ static int32_t compareSMemRow(const void *x, const void *y) {
* *
* @param builder the SSubmitBlkBuilder. * @param builder the SSubmitBlkBuilder.
* @param target the target to write. * @param target the target to write.
* @param nRows the number of rows in SSubmitBlk*.
* @return the writen bytes. * @return the writen bytes.
*/ */
static size_t writeSSubmitBlkBuilder(SSubmitBlkBuilder* builder, SSubmitBlk* target) { static size_t writeSSubmitBlkBuilder(SSubmitBlkBuilder* builder, SSubmitBlk* target, size_t* nRows) {
memcpy(target, builder->metadata, sizeof(SSubmitBlk)); memcpy(target, builder->metadata, sizeof(SSubmitBlk));
uint32_t dataLen = 0; uint32_t dataLen = 0;
...@@ -2307,9 +2308,11 @@ static size_t writeSSubmitBlkBuilder(SSubmitBlkBuilder* builder, SSubmitBlk* tar ...@@ -2307,9 +2308,11 @@ static size_t writeSSubmitBlkBuilder(SSubmitBlkBuilder* builder, SSubmitBlk* tar
dataLen += memRowTLen(pRow); dataLen += memRowTLen(pRow);
} }
*nRows = taosArrayGetSize(builder->rows);
target->schemaLen = 0; target->schemaLen = 0;
target->dataLen = htonl(dataLen); target->dataLen = (int32_t) htonl(dataLen);
target->numOfRows = htons(taosArrayGetSize(builder->rows)); target->numOfRows = (int16_t) htons(*nRows);
return dataLen + sizeof(SSubmitBlk); return dataLen + sizeof(SSubmitBlk);
} }
...@@ -2323,7 +2326,7 @@ static size_t writeSSubmitBlkBuilder(SSubmitBlkBuilder* builder, SSubmitBlk* tar ...@@ -2323,7 +2326,7 @@ static size_t writeSSubmitBlkBuilder(SSubmitBlkBuilder* builder, SSubmitBlk* tar
static size_t writenSizeSSubmitBlkBuilder(SSubmitBlkBuilder* builder) { static size_t writenSizeSSubmitBlkBuilder(SSubmitBlkBuilder* builder) {
size_t dataLen = 0; size_t dataLen = 0;
for (int i = 0; i < taosArrayGetSize(builder->rows); ++i) { for (int i = 0; i < taosArrayGetSize(builder->rows); ++i) {
char* pRow = *(char**) (taosArrayGet(builder->rows, i)); char* pRow = taosArrayGetP(builder->rows, i);
dataLen += memRowTLen(pRow); dataLen += memRowTLen(pRow);
} }
return dataLen + sizeof(SSubmitBlk); return dataLen + sizeof(SSubmitBlk);
...@@ -2381,18 +2384,21 @@ static size_t writenSizeSSubmitMsgBuilder(SSubmitMsgBlocksBuilder* builder) { ...@@ -2381,18 +2384,21 @@ static size_t writenSizeSSubmitMsgBuilder(SSubmitMsgBlocksBuilder* builder) {
* *
* @param builder the SSubmitBlkBuilder. * @param builder the SSubmitBlkBuilder.
* @param pBlocks the target to write. * @param pBlocks the target to write.
* @param nRows the number of row in SSubmitMsg::blocks.
* @return the writen bytes. * @return the writen bytes.
*/ */
size_t writeSSubmitMsgBlocksBuilder(SSubmitMsgBlocksBuilder* builder, SSubmitBlk* pBlocks) { size_t writeSSubmitMsgBlocksBuilder(SSubmitMsgBlocksBuilder* builder, SSubmitBlk* pBlocks, size_t* nRows) {
size_t nWrite = 0; size_t nWrite = 0;
SSubmitBlkBuilder** iter = taosHashIterate(builder->blockBuilders, NULL); SSubmitBlkBuilder** iter = taosHashIterate(builder->blockBuilders, NULL);
while (iter) { while (iter) {
size_t nSubRows = 0;
SSubmitBlkBuilder* blocksBuilder = *iter; SSubmitBlkBuilder* blocksBuilder = *iter;
SSubmitBlk* pBlock = POINTER_SHIFT(pBlocks, nWrite); SSubmitBlk* pBlock = POINTER_SHIFT(pBlocks, nWrite);
nWrite += writeSSubmitBlkBuilder(blocksBuilder, pBlock); nWrite += writeSSubmitBlkBuilder(blocksBuilder, pBlock, &nSubRows);
*nRows += nSubRows;
iter = taosHashIterate(builder->blockBuilders, iter); iter = taosHashIterate(builder->blockBuilders, iter);
} }
return nWrite; return nWrite;
} }
...@@ -2401,7 +2407,7 @@ size_t writeSSubmitMsgBlocksBuilder(SSubmitMsgBlocksBuilder* builder, SSubmitBlk ...@@ -2401,7 +2407,7 @@ size_t writeSSubmitMsgBlocksBuilder(SSubmitMsgBlocksBuilder* builder, SSubmitBlk
* @param builder the SSubmitMsgBlocksBuilder. * @param builder the SSubmitMsgBlocksBuilder.
* @return the number of SSubmitBlk block. * @return the number of SSubmitBlk block.
*/ */
size_t blockSizeSSubmitMsgBlocksBuilder(SSubmitMsgBlocksBuilder* builder) { inline static size_t nBlockSSubmitMsgBlocksBuilder(SSubmitMsgBlocksBuilder* builder) {
return taosHashGetSize(builder->blockBuilders); return taosHashGetSize(builder->blockBuilders);
} }
...@@ -2456,12 +2462,12 @@ static SSubmitBlkBuilder* computeIfAbsentSSubmitBlkBuilder(SSubmitMsgBlocksBuild ...@@ -2456,12 +2462,12 @@ static SSubmitBlkBuilder* computeIfAbsentSSubmitBlkBuilder(SSubmitMsgBlocksBuild
* *
* @param builder the SSubmitMsgBlocksBuilder. * @param builder the SSubmitMsgBlocksBuilder.
* @param pMsg the SSubmitMsg* * @param pMsg the SSubmitMsg*
* @param numOfBlocks the number of blocks in SSubmitMsg. * @param nBlocks the number of blocks in SSubmitMsg.
* @return whether the append is success. * @return whether the append is success.
*/ */
static bool appendSSubmitMsgBlocks(SSubmitMsgBlocksBuilder* builder, SSubmitBlk* pBlocks, size_t numOfBlocks) { static bool appendSSubmitMsgBlocks(SSubmitMsgBlocksBuilder* builder, SSubmitBlk* pBlocks, size_t nBlocks) {
SSubmitBlk* pBlock = pBlocks; SSubmitBlk* pBlock = pBlocks;
for (size_t i = 0; i < numOfBlocks; ++i) { for (size_t i = 0; i < nBlocks; ++i) {
assert(pBlock->schemaLen == 0); assert(pBlock->schemaLen == 0);
SSubmitBlkBuilder* blocksBuilder = computeIfAbsentSSubmitBlkBuilder(builder, pBlock); SSubmitBlkBuilder* blocksBuilder = computeIfAbsentSSubmitBlkBuilder(builder, pBlock);
if (!blocksBuilder) { if (!blocksBuilder) {
...@@ -2545,9 +2551,10 @@ static bool appendSTableDataBlocksBuilder(STableDataBlocksBuilder* builder, STab ...@@ -2545,9 +2551,10 @@ static bool appendSTableDataBlocksBuilder(STableDataBlocksBuilder* builder, STab
/** /**
* Build the data blocks for single vnode. * Build the data blocks for single vnode.
* @param builder the STableDataBlocksBuilder. * @param builder the STableDataBlocksBuilder.
* @param nRows the number of row in STableDataBlocks.
* @return the data blocks for single vnode. * @return the data blocks for single vnode.
*/ */
static STableDataBlocks* buildSTableDataBlocksBuilder(STableDataBlocksBuilder* builder) { static STableDataBlocks* buildSTableDataBlocksBuilder(STableDataBlocksBuilder* builder, size_t* nRows) {
SSubmitMsgBlocksBuilder* blocksBuilder = builder->blocksBuilder; SSubmitMsgBlocksBuilder* blocksBuilder = builder->blocksBuilder;
STableDataBlocks *firstBlock = builder->firstBlock; STableDataBlocks *firstBlock = builder->firstBlock;
if (!firstBlock) { if (!firstBlock) {
...@@ -2559,15 +2566,15 @@ static STableDataBlocks* buildSTableDataBlocksBuilder(STableDataBlocksBuilder* b ...@@ -2559,15 +2566,15 @@ static STableDataBlocks* buildSTableDataBlocksBuilder(STableDataBlocksBuilder* b
size_t nAllocSize = nWriteSize + nHeaderSize; size_t nAllocSize = nWriteSize + nHeaderSize;
STableDataBlocks* dataBlocks = NULL; STableDataBlocks* dataBlocks = NULL;
int32_t code = tscCreateDataBlock(nAllocSize, 0, nHeaderSize, &firstBlock->tableName, firstBlock->pTableMeta, &dataBlocks); int32_t code = tscCreateDataBlock(nAllocSize, 0, (int32_t) nHeaderSize, &firstBlock->tableName, firstBlock->pTableMeta, &dataBlocks);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return NULL; return NULL;
} }
dataBlocks->size = nHeaderSize; dataBlocks->size = nHeaderSize;
memcpy(dataBlocks->pData, firstBlock->pData, nHeaderSize); memcpy(dataBlocks->pData, firstBlock->pData, nHeaderSize);
dataBlocks->size += writeSSubmitMsgBlocksBuilder(blocksBuilder, (SSubmitBlk *) (dataBlocks->pData + nHeaderSize)); dataBlocks->size += writeSSubmitMsgBlocksBuilder(blocksBuilder, (SSubmitBlk *) (dataBlocks->pData + nHeaderSize), nRows);
dataBlocks->numOfTables = blockSizeSSubmitMsgBlocksBuilder(blocksBuilder); dataBlocks->numOfTables = (int32_t) nBlockSSubmitMsgBlocksBuilder(blocksBuilder);
return dataBlocks; return dataBlocks;
} }
...@@ -2648,10 +2655,11 @@ static bool appendSTableDataBlocksListBuilder(STableDataBlocksListBuilder* build ...@@ -2648,10 +2655,11 @@ static bool appendSTableDataBlocksListBuilder(STableDataBlocksListBuilder* build
* Build the vnode data blocks list. * Build the vnode data blocks list.
* *
* @param builder the STableDataBlocksListBuilder. * @param builder the STableDataBlocksListBuilder.
* @param numOfTables the number of tables. * @param nTables the number of table in vnode data blocks list.
* @param nRows the number of row in vnode data blocks list.
* @return the vnode data blocks list. * @return the vnode data blocks list.
*/ */
static SArray* buildSTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder, size_t* numOfTables) { static SArray* buildSTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder, size_t* nTables, size_t* nRows) {
SArray* pVnodeDataBlockList = taosArrayInit(taosHashGetSize(builder->dataBlocksBuilders), sizeof(STableDataBlocks*)); SArray* pVnodeDataBlockList = taosArrayInit(taosHashGetSize(builder->dataBlocksBuilders), sizeof(STableDataBlocks*));
if (!pVnodeDataBlockList) { if (!pVnodeDataBlockList) {
return NULL; return NULL;
...@@ -2659,12 +2667,14 @@ static SArray* buildSTableDataBlocksListBuilder(STableDataBlocksListBuilder* bui ...@@ -2659,12 +2667,14 @@ static SArray* buildSTableDataBlocksListBuilder(STableDataBlocksListBuilder* bui
STableDataBlocksBuilder** iter = taosHashIterate(builder->dataBlocksBuilders, NULL); STableDataBlocksBuilder** iter = taosHashIterate(builder->dataBlocksBuilders, NULL);
while (iter) { while (iter) {
size_t nSubRows = 0;
STableDataBlocksBuilder* dataBlocksBuilder = *iter; STableDataBlocksBuilder* dataBlocksBuilder = *iter;
STableDataBlocks* dataBlocks = buildSTableDataBlocksBuilder(dataBlocksBuilder); STableDataBlocks* dataBlocks = buildSTableDataBlocksBuilder(dataBlocksBuilder, &nSubRows);
if (!dataBlocks) { if (!dataBlocks) {
goto error; goto error;
} }
*numOfTables += dataBlocks->numOfTables; *nTables += dataBlocks->numOfTables;
*nRows += nSubRows;
taosArrayPush(pVnodeDataBlockList, &dataBlocks); taosArrayPush(pVnodeDataBlockList, &dataBlocks);
iter = taosHashIterate(builder->dataBlocksBuilders, iter); iter = taosHashIterate(builder->dataBlocksBuilders, iter);
...@@ -2673,7 +2683,7 @@ static SArray* buildSTableDataBlocksListBuilder(STableDataBlocksListBuilder* bui ...@@ -2673,7 +2683,7 @@ static SArray* buildSTableDataBlocksListBuilder(STableDataBlocksListBuilder* bui
error: error:
for (int i = 0; i < taosArrayGetSize(pVnodeDataBlockList); ++i) { for (int i = 0; i < taosArrayGetSize(pVnodeDataBlockList); ++i) {
STableDataBlocks* dataBlocks = *((STableDataBlocks**)taosArrayGet(pVnodeDataBlockList, i)); STableDataBlocks* dataBlocks = taosArrayGetP(pVnodeDataBlockList, i);
tscDestroyDataBlock(NULL, dataBlocks, false); tscDestroyDataBlock(NULL, dataBlocks, false);
} }
taosArrayDestroy(&pVnodeDataBlockList); taosArrayDestroy(&pVnodeDataBlockList);
...@@ -2850,8 +2860,9 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj *result) { ...@@ -2850,8 +2860,9 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj *result) {
// build the vnode data blocks. // build the vnode data blocks.
size_t numOfBlocks = 0; size_t numOfBlocks = 0;
size_t numOfRows = 0;
SInsertStatementParam* pInsertParam = &result->cmd.insertParam; SInsertStatementParam* pInsertParam = &result->cmd.insertParam;
SArray* pVnodeDataBlocksList = buildSTableDataBlocksListBuilder(builder, &numOfBlocks); SArray* pVnodeDataBlocksList = buildSTableDataBlocksListBuilder(builder, &numOfBlocks, &numOfRows);
if (!pVnodeDataBlocksList) { if (!pVnodeDataBlocksList) {
destroySTableDataBlocksListBuilder(builder); destroySTableDataBlocksListBuilder(builder);
destroySTableNameListBuilder(nameListBuilder); destroySTableNameListBuilder(nameListBuilder);
...@@ -2866,9 +2877,6 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj *result) { ...@@ -2866,9 +2877,6 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj *result) {
destroySTableNameListBuilder(nameListBuilder); destroySTableNameListBuilder(nameListBuilder);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
if (numOfTables != numOfBlocks) {
printf("numOfTables=%zu, numOfBlocks=%zu\n", numOfTables, numOfBlocks);
}
assert(numOfTables == numOfBlocks); assert(numOfTables == numOfBlocks);
// replace table name list. // replace table name list.
...@@ -2891,6 +2899,7 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj *result) { ...@@ -2891,6 +2899,7 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj *result) {
taosArrayDestroy(&pInsertParam->pDataBlocks); taosArrayDestroy(&pInsertParam->pDataBlocks);
} }
pInsertParam->pDataBlocks = pVnodeDataBlocksList; pInsertParam->pDataBlocks = pVnodeDataBlocksList;
pInsertParam->numOfRows = (int32_t) numOfRows;
// clean up. // clean up.
destroySTableDataBlocksListBuilder(builder); destroySTableDataBlocksListBuilder(builder);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册