diff --git a/src/client/inc/tscDataBlockMerge.h b/src/client/inc/tscDataBlockMerge.h new file mode 100644 index 0000000000000000000000000000000000000000..a5b0b6e36d5593653d96448e03e32c65a4fd3386 --- /dev/null +++ b/src/client/inc/tscDataBlockMerge.h @@ -0,0 +1,273 @@ +/* +* Copyright (c) 2019 TAOS Data, Inc. +* +* This program is free software: you can use, redistribute, and/or modify +* it under the terms of the GNU Affero General Public License, version 3 +* or later ("AGPL"), as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, but WITHOUT +* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +* FITNESS FOR A PARTICULAR PURPOSE. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see . +*/ + +#ifndef TDENGINE_TSCDATABLOCKMERGE_H +#define TDENGINE_TSCDATABLOCKMERGE_H + +#include "hash.h" +#include "taosmsg.h" +#include "tarray.h" +#include "tscUtil.h" +#include "tsclient.h" + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * A builder of SSubmitBlk. + */ +typedef struct SSubmitBlkBuilder { + // the metadata of the SSubmitBlk. + SSubmitBlk* metadata; + + // the array stores all the rows in a table, aka SArray. + SArray* rows; + +} SSubmitBlkBuilder; + +/** + * The builder to build SSubmitMsg::blocks. + */ +typedef struct SSubmitMsgBlocksBuilder { + // SHashObj. + SHashObj* blockBuilders; + int64_t vgId; +} SSubmitMsgBlocksBuilder; + +/** + * STableDataBlocksBuilder is a tool to build data blocks by append the existing data blocks in a vnode. + */ +typedef struct STableDataBlocksBuilder { + SSubmitMsgBlocksBuilder* blocksBuilder; + STableDataBlocks* firstBlock; + int64_t vgId; +} STableDataBlocksBuilder; + +/** + * STableDataBlocksListBuilder is a tool to build vnode data blocks list by appending exist data blocks. + */ +typedef struct STableDataBlocksListBuilder { + SHashObj* dataBlocksBuilders; +} STableDataBlocksListBuilder; + +/** + * A Builder to build SInsertStatementParam::pTableNameList. + */ +typedef struct STableNameListBuilder { + // store the unsorted table names, SArray. + SArray* tableNames; +} STableNameListBuilder; + +/** + * Create a SSubmitBlkBuilder using exist metadata. + * + * @param metadata the metadata. + * @return the SSubmitBlkBuilder. + */ +SSubmitBlkBuilder* createSSubmitBlkBuilder(SSubmitBlk* metadata); + +/** + * Destroy the SSubmitBlkBuilder. + * + * @param builder the SSubmitBlkBuilder. + */ +void destroySSubmitBlkBuilder(SSubmitBlkBuilder* builder); + +/** + * Append a SSubmitBlk* to the builder. The table uid in pBlock must be the same with the builder's. + * + * @param builder the SSubmitBlkBuilder. + * @param pBlock the pBlock to append. + * @return whether the append is success. + */ +bool appendSSubmitBlkBuilder(SSubmitBlkBuilder* builder, SSubmitBlk *pBlock); + + +/** + * Build and write SSubmitBlk to `target` + * + * @param builder the SSubmitBlkBuilder. + * @param target the target to write. + * @param nRows the number of rows in SSubmitBlk*. + * @return the writen bytes. + */ +size_t writeSSubmitBlkBuilder(SSubmitBlkBuilder* builder, SSubmitBlk* target, size_t* nRows); + +/** + * Get the expected writen bytes of `writeSSubmitBlkBuilder`. + * + * @param builder the SSubmitBlkBuilder. + * @return the expected writen bytes of `writeSSubmitBlkBuilder`. + */ +size_t nWriteSSubmitBlkBuilder(SSubmitBlkBuilder* builder); + +/** + * Create a SSubmitMsgBuilder. + * + * @param vgId the vgId of SSubmitMsg. + * @return the SSubmitMsgBuilder. + */ +SSubmitMsgBlocksBuilder* createSSubmitMsgBuilder(int64_t vgId); + +/** + * Get the expected writen bytes of `writeSSubmitMsgBlocksBuilder`. + * + * @param builder the SSubmitMsgBlocksBuilder. + * @return the expected writen bytes of `writeSSubmitMsgBlocksBuilder`. + */ +size_t nWriteSSubmitMsgBuilder(SSubmitMsgBlocksBuilder* builder); + +/** + * Build and write SSubmitMsg::blocks to `pBlocks` + * + * @param builder the SSubmitBlkBuilder. + * @param pBlocks the target to write. + * @param nRows the number of row in SSubmitMsg::blocks. + * @return the writen bytes. + */ +size_t writeSSubmitMsgBlocksBuilder(SSubmitMsgBlocksBuilder* builder, SSubmitBlk* pBlocks, size_t* nRows); + +/** + * Get the number of block in SSubmitMsgBlocksBuilder. + * @param builder the SSubmitMsgBlocksBuilder. + * @return the number of SSubmitBlk block. + */ +size_t nBlockSSubmitMsgBlocksBuilder(SSubmitMsgBlocksBuilder* builder); + +/** + * Destroy the SSubmitMsgBlocksBuilder. + * + * @param builder the SSubmitMsgBlocksBuilder to destroy. + */ +void destroySSubmitMsgBuilder(SSubmitMsgBlocksBuilder* builder); + +/** + * Append SSubmitMsg* to the SSubmitMsgBlocksBuilder. + * + * @param builder the SSubmitMsgBlocksBuilder. + * @param pBlocks the SSubmitBlk in SSubmitMsg::blocks. + * @param nBlocks the number of blocks in SSubmitMsg. + * @return whether the append is success. + */ +bool appendSSubmitMsgBlocks(SSubmitMsgBlocksBuilder* builder, SSubmitBlk* pBlocks, size_t nBlocks); + +/** + * Create the STableDataBlocksBuilder. + * + * @param vgId the vgId of STableDataBlocksBuilder. + * @return the STableDataBlocksBuilder. + */ +STableDataBlocksBuilder* createSTableDataBlocksBuilder(int64_t vgId); + +/** + * Destroy the STableDataBlocksBuilder. + * @param builder the STableDataBlocksBuilder. + */ +void destroySTableDataBlocksBuilder(STableDataBlocksBuilder *builder); + +/** + * Append a data blocks to STableDataBlocksBuilder. + * @param builder the STableDataBlocksBuilder. + * @param dataBlocks the dataBlocks to append. the vgId of dataBlocks must be same with the STableDataBlocksBuilder. + * @return whether the append is success. + */ +bool appendSTableDataBlocksBuilder(STableDataBlocksBuilder* builder, STableDataBlocks* dataBlocks); + +/** + * Build the data blocks for single vnode. + * @param builder the STableDataBlocksBuilder. + * @param nRows the number of row in STableDataBlocks. + * @return the data blocks for single vnode. + */ +STableDataBlocks* buildSTableDataBlocksBuilder(STableDataBlocksBuilder* builder, size_t* nRows); + +/** + * Create the STableDataBlocksListBuilder. + * + * @return the STableDataBlocksListBuilder. + */ +STableDataBlocksListBuilder* createSTableDataBlocksListBuilder(); + +/** + * Destroy the STableDataBlocksListBuilder. + * + * @param builder the STableDataBlocksListBuilder. + */ +void destroySTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder); + +/** + * Append a data blocks to STableDataBlocksListBuilder. + * + * @param builder the STableDataBlocksListBuilder. + * @param dataBlocks the data blocks. + * @return whether the append is success. + */ +bool appendSTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder, STableDataBlocks* dataBlocks); + +/** + * Build the vnode data blocks list. + * + * @param builder the STableDataBlocksListBuilder. + * @param nTables the number of table in vnode data blocks list. + * @param nRows the number of row in vnode data blocks list. + * @return the vnode data blocks list. + */ +SArray* buildSTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder, size_t* nTables, size_t* nRows); + +/** + * Create STableNameListBuilder. + */ +STableNameListBuilder* createSTableNameListBuilder(); + +/** + * Destroy the STableNameListBuilder. + * @param builder the STableNameListBuilder. + */ +void destroySTableNameListBuilder(STableNameListBuilder* builder); + +/** + * Insert a SName to builder. + * + * @param builder the STableNameListBuilder. + * @param name the table name. + * @return whether it is success. + */ +bool insertSTableNameListBuilder(STableNameListBuilder* builder, SName* name); + +/** + * Build the STable name list. + * + * @param builder the STableNameListBuilder. + * @param numOfTables the number of table. + * @return the STable name list. + */ +SName** buildSTableNameListBuilder(STableNameListBuilder* builder, size_t* numOfTables); + +/** + * Merge the KV-PayLoad SQL objects into single one. + * The statements here must be an insertion statement and no schema attached. + * + * @param statements the array of statements. a.k.a SArray. + * @param result the returned result. result is not null! + * @return the status code. usually TSDB_CODE_SUCCESS. + */ +int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj *result); + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_TSCDATABLOCKMERGE_H diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index c59ad5ba267c3e383b6a1538d61fd66ca7caeb4f..2699f4dbda4f74b204ba89f108b9c2a91d6cb4e9 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -149,7 +149,6 @@ void* tscDestroyUdfArrayList(SArray* pUdfList); void* tscDestroyBlockHashTable(SSqlObj* pSql, SHashObj* pBlockHashTable, bool removeMeta); int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock); -int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj *result); int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertParam, bool freeBlockMap); int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, SName* pName, STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList); diff --git a/src/client/src/tscBulkWrite.c b/src/client/src/tscBulkWrite.c index e62a2dc191bdb2c1b3244f1713954bcc8c4fe86a..1a970a7cfe448da8fead8645a51cc10766638b19 100644 --- a/src/client/src/tscBulkWrite.c +++ b/src/client/src/tscBulkWrite.c @@ -15,6 +15,7 @@ #include "osAtomic.h" +#include "tscDataBlockMerge.h" #include "tscBulkWrite.h" #include "tscLog.h" #include "tscSubquery.h" diff --git a/src/client/src/tscDataBlockMerge.c b/src/client/src/tscDataBlockMerge.c new file mode 100644 index 0000000000000000000000000000000000000000..197e6561b869518e81a6f9bbe17466957c494044 --- /dev/null +++ b/src/client/src/tscDataBlockMerge.c @@ -0,0 +1,545 @@ +/* +* Copyright (c) 2019 TAOS Data, Inc. +* +* This program is free software: you can use, redistribute, and/or modify +* it under the terms of the GNU Affero General Public License, version 3 +* or later ("AGPL"), as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, but WITHOUT +* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +* FITNESS FOR A PARTICULAR PURPOSE. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see . +*/ + +#include "tscDataBlockMerge.h" + +/** + * A util function to compare two SName. + */ +static int32_t compareSName(const void *x, const void *y) { + SName* left = *((SName **) x); + SName* right = *((SName **) y); + if (left == right) { + return 0; + } + return strncmp((const char*) left, (const char*) right, sizeof(SName)); +} + +/** + * A util function to sort SArray by key. + */ +static int32_t compareSMemRow(const void *x, const void *y) { + TSKEY left = memRowKey(*(void **) x); + TSKEY right = memRowKey(*(void **) y); + if (left == right) { + return 0; + } else { + return left > right ? 1 : -1; + } +} + +/** + * If the SSubmitBlkBuilder of pBlock->uid is present, returns it. Otherwise, build a new SSubmitBlkBuilder. + * + * @param builder the SSubmitMsgBlocksBuilder. + * @param pBlock the SSubmitBlk. + * @return the SSubmitBlkBuilder (NULL means failure). + */ +inline static SSubmitBlkBuilder* computeIfAbsentSSubmitBlkBuilder(SSubmitMsgBlocksBuilder* builder, SSubmitBlk* pBlock) { + SSubmitBlkBuilder** iter = taosHashGet(builder->blockBuilders, &pBlock->uid, sizeof(pBlock->uid)); + SSubmitBlkBuilder* blocksBuilder = NULL; + if (iter) { + return *iter; + } + + blocksBuilder = createSSubmitBlkBuilder(pBlock); + if (!blocksBuilder) { + return NULL; + } + + if (taosHashPut(builder->blockBuilders, &pBlock->uid, sizeof(pBlock->uid), &blocksBuilder, sizeof(SArray*))) { + destroySSubmitBlkBuilder(blocksBuilder); + return NULL; + } + + return blocksBuilder; +} + +SName** buildSTableNameListBuilder(STableNameListBuilder* builder, size_t* numOfTables) { + if (!taosArrayGetSize(builder->tableNames)) { + *numOfTables = 0; + return NULL; + } + + // sort and unique. + taosArraySort(builder->tableNames, compareSName); + size_t tail = 0; + for (size_t i = 1; i < taosArrayGetSize(builder->tableNames); ++i) { + SName* last = taosArrayGetP(builder->tableNames, tail); + SName* current = taosArrayGetP(builder->tableNames, i); + if (compareSName(last, current) != 0) { + ++tail; + taosArraySet(builder->tableNames, tail, ¤t); + } + } + + // build table names list. + SName** tableNames = calloc(tail + 1, sizeof(SName*)); + if (!tableNames) { + return NULL; + } + + // clone data. + for (size_t i = 0; i <= tail; ++i) { + SName* clone = malloc(sizeof(SName)); + if (!clone) { + goto error; + } + memcpy(clone, taosArrayGetP(builder->tableNames, i), sizeof(SName)); + tableNames[i] = clone; + } + + *numOfTables = tail + 1; + return tableNames; + +error: + for (size_t i = 0; i <= tail; ++i) { + if (tableNames[i]) { + free(tableNames[i]); + } + } + free(tableNames); + return NULL; +} + +SSubmitBlkBuilder* createSSubmitBlkBuilder(SSubmitBlk* metadata) { + SSubmitBlkBuilder* builder = calloc(1, sizeof(SSubmitBlkBuilder)); + if (!builder) { + return NULL; + } + + builder->rows = taosArrayInit(1, sizeof(SMemRow)); + if (!builder->rows) { + free(builder); + return NULL; + } + + builder->metadata = calloc(1, sizeof(SSubmitBlk)); + if (!builder->metadata) { + taosArrayDestroy(&builder->rows); + free(builder); + return NULL; + } + memcpy(builder->metadata, metadata, sizeof(SSubmitBlk)); + + return builder; +} + +void destroySSubmitBlkBuilder(SSubmitBlkBuilder* builder) { + if (!builder) { + return; + } + taosArrayDestroy(&builder->rows); + free(builder->metadata); + free(builder); +} + +bool appendSSubmitBlkBuilder(SSubmitBlkBuilder* builder, SSubmitBlk* pBlock) { + assert(pBlock->uid == builder->metadata->uid); + assert(pBlock->schemaLen == 0); + + // shadow copy all the SMemRow to SSubmitBlkBuilder::rows. + char* pRow = pBlock->data; + char* pEnd = pBlock->data + htonl(pBlock->dataLen); + while (pRow < pEnd) { + if (!taosArrayPush(builder->rows, &pRow)) { + return false; + } + pRow += memRowTLen(pRow); + } + + return true; +} + +size_t writeSSubmitBlkBuilder(SSubmitBlkBuilder* builder, SSubmitBlk* target, size_t* nRows) { + memcpy(target, builder->metadata, sizeof(SSubmitBlk)); + + // sort SSubmitBlkBuilder::rows by timestamp. + uint32_t dataLen = 0; + taosArraySort(builder->rows, compareSMemRow); + + // deep copy all the SMemRow to target. + for (int i = 0; i < taosArrayGetSize(builder->rows); ++i) { + char* pRow = taosArrayGetP(builder->rows, i); + memcpy(POINTER_SHIFT(target->data, dataLen), pRow, memRowTLen(pRow)); + dataLen += memRowTLen(pRow); + } + + *nRows = taosArrayGetSize(builder->rows); + + target->schemaLen = 0; + target->dataLen = (int32_t) htonl(dataLen); + target->numOfRows = (int16_t) htons(*nRows); + + return dataLen + sizeof(SSubmitBlk); +} + +size_t nWriteSSubmitBlkBuilder(SSubmitBlkBuilder* builder) { + size_t dataLen = 0; + for (int i = 0; i < taosArrayGetSize(builder->rows); ++i) { + char* pRow = taosArrayGetP(builder->rows, i); + dataLen += memRowTLen(pRow); + } + return dataLen + sizeof(SSubmitBlk); +} + +SSubmitMsgBlocksBuilder* createSSubmitMsgBuilder(int64_t vgId) { + SSubmitMsgBlocksBuilder* builder = calloc(1, sizeof(SSubmitMsgBlocksBuilder)); + if (!builder) { + return NULL; + } + builder->vgId = vgId; + + builder->blockBuilders = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); + if (!builder->blockBuilders) { + free(builder); + return NULL; + } + return builder; +} + +size_t nWriteSSubmitMsgBuilder(SSubmitMsgBlocksBuilder* builder) { + size_t nWrite = 0; + SSubmitBlkBuilder** iter = taosHashIterate(builder->blockBuilders, NULL); + while (iter) { + SSubmitBlkBuilder* blocksBuilder = *iter; + nWrite += nWriteSSubmitBlkBuilder(blocksBuilder); + iter = taosHashIterate(builder->blockBuilders, iter); + } + return nWrite; +} + +size_t writeSSubmitMsgBlocksBuilder(SSubmitMsgBlocksBuilder* builder, SSubmitBlk* pBlocks, size_t* nRows) { + size_t nWrite = 0; + SSubmitBlkBuilder** iter = taosHashIterate(builder->blockBuilders, NULL); + + // copy all the SSubmitBlk to pBlocks. + while (iter) { + size_t nSubRows = 0; + SSubmitBlkBuilder* blocksBuilder = *iter; + SSubmitBlk* pBlock = POINTER_SHIFT(pBlocks, nWrite); + nWrite += writeSSubmitBlkBuilder(blocksBuilder, pBlock, &nSubRows); + *nRows += nSubRows; + iter = taosHashIterate(builder->blockBuilders, iter); + } + return nWrite; +} + +size_t nBlockSSubmitMsgBlocksBuilder(SSubmitMsgBlocksBuilder* builder) { + return taosHashGetSize(builder->blockBuilders); +} + +void destroySSubmitMsgBuilder(SSubmitMsgBlocksBuilder* builder) { + if (!builder) { + return; + } + + SSubmitBlkBuilder** iter = taosHashIterate(builder->blockBuilders, NULL); + while (iter) { + destroySSubmitBlkBuilder(*iter); + iter = taosHashIterate(builder->blockBuilders, iter); + } + taosHashCleanup(builder->blockBuilders); + free(builder); +} + +bool appendSSubmitMsgBlocks(SSubmitMsgBlocksBuilder* builder, SSubmitBlk* pBlocks, size_t nBlocks) { + SSubmitBlk* pBlock = pBlocks; + for (size_t i = 0; i < nBlocks; ++i) { + // not support SSubmitBlk with schema. + assert(pBlock->schemaLen == 0); + + // get the builder of specific table (by pBlock->uid). + SSubmitBlkBuilder* blocksBuilder = computeIfAbsentSSubmitBlkBuilder(builder, pBlock); + if (!blocksBuilder) { + return false; + } + + if (!appendSSubmitBlkBuilder(blocksBuilder, pBlock)) { + return false; + } + + // go to next block. + size_t blockSize = sizeof (SSubmitBlk) + htonl(pBlock->dataLen); + pBlock = POINTER_SHIFT(pBlock, blockSize); + } + return true; +} + +STableDataBlocksBuilder* createSTableDataBlocksBuilder(int64_t vgId) { + STableDataBlocksBuilder* builder = calloc(1, sizeof(STableDataBlocksBuilder)); + if (!builder) { + return NULL; + } + + builder->blocksBuilder = createSSubmitMsgBuilder(vgId); + if (!builder->blocksBuilder) { + free(builder); + return NULL; + } + + builder->vgId = vgId; + builder->firstBlock = NULL; + return builder; +} + +void destroySTableDataBlocksBuilder(STableDataBlocksBuilder* builder) { + if (!builder) { + return; + } + + destroySSubmitMsgBuilder(builder->blocksBuilder); + free(builder); +} + +bool appendSTableDataBlocksBuilder(STableDataBlocksBuilder* builder, STableDataBlocks* dataBlocks) { + // the data blocks vgId must be same with builder vgId. + if (!dataBlocks || dataBlocks->vgId != builder->vgId) { + return false; + } + + if (!builder->firstBlock) { + builder->firstBlock = dataBlocks; + } + + SSubmitBlk* pBlocks = (SSubmitBlk *)(dataBlocks->pData + dataBlocks->headerSize); + return appendSSubmitMsgBlocks(builder->blocksBuilder, pBlocks, dataBlocks->numOfTables); +} + +STableDataBlocks* buildSTableDataBlocksBuilder(STableDataBlocksBuilder* builder, size_t* nRows) { + SSubmitMsgBlocksBuilder* blocksBuilder = builder->blocksBuilder; + STableDataBlocks *firstBlock = builder->firstBlock; + if (!firstBlock) { + return NULL; + } + + size_t nWriteSize = nWriteSSubmitMsgBuilder(builder->blocksBuilder); + size_t nHeaderSize = firstBlock->headerSize; + size_t nAllocSize = nWriteSize + nHeaderSize; + + // allocate data blocks. + STableDataBlocks* dataBlocks = NULL; + int32_t code = tscCreateDataBlock(nAllocSize, 0, (int32_t) nHeaderSize, &firstBlock->tableName, firstBlock->pTableMeta, &dataBlocks); + if (code != TSDB_CODE_SUCCESS) { + return NULL; + } + + // build the header (using first block). + dataBlocks->size = nHeaderSize; + memcpy(dataBlocks->pData, firstBlock->pData, nHeaderSize); + + // build the SSubmitMsg::blocks. + dataBlocks->size += writeSSubmitMsgBlocksBuilder(blocksBuilder, (SSubmitBlk *) (dataBlocks->pData + nHeaderSize), nRows); + dataBlocks->numOfTables = (int32_t) nBlockSSubmitMsgBlocksBuilder(blocksBuilder); + return dataBlocks; +} + +STableDataBlocksListBuilder* createSTableDataBlocksListBuilder() { + STableDataBlocksListBuilder* builder = calloc(1, sizeof(STableDataBlocksListBuilder)); + if (!builder) { + return NULL; + } + + builder->dataBlocksBuilders = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); + if (!builder->dataBlocksBuilders) { + free(builder); + return NULL; + } + + return builder; +} + +void destroySTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder) { + if (!builder) { + return; + } + + STableDataBlocksBuilder** iter = taosHashIterate(builder->dataBlocksBuilders, NULL); + while (iter) { + destroySTableDataBlocksBuilder(*iter); + iter = taosHashIterate(builder->dataBlocksBuilders, iter); + } + taosHashCleanup(builder->dataBlocksBuilders); + free(builder); +} + +bool appendSTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder, STableDataBlocks* dataBlocks) { + // get the data blocks builder of specific vgId. + STableDataBlocksBuilder** item = taosHashGet(builder->dataBlocksBuilders, &dataBlocks->vgId, sizeof(dataBlocks->vgId)); + STableDataBlocksBuilder* blocksBuilder = NULL; + if (item) { + blocksBuilder = *item; + } else { + blocksBuilder = createSTableDataBlocksBuilder(dataBlocks->vgId); + if (!blocksBuilder) { + return false; + } + + if (taosHashPut(builder->dataBlocksBuilders, &dataBlocks->vgId, sizeof(dataBlocks->vgId), &blocksBuilder, sizeof(STableDataBlocksBuilder*))) { + destroySTableDataBlocksBuilder(blocksBuilder); + return false; + } + } + + // append to this builder. + return appendSTableDataBlocksBuilder(blocksBuilder, dataBlocks); +} + +SArray* buildSTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder, size_t* nTables, size_t* nRows) { + SArray* pVnodeDataBlockList = taosArrayInit(taosHashGetSize(builder->dataBlocksBuilders), sizeof(STableDataBlocks*)); + if (!pVnodeDataBlockList) { + return NULL; + } + + // build data blocks of each vgId. + STableDataBlocksBuilder** iter = taosHashIterate(builder->dataBlocksBuilders, NULL); + while (iter) { + size_t nSubRows = 0; + STableDataBlocksBuilder* dataBlocksBuilder = *iter; + STableDataBlocks* dataBlocks = buildSTableDataBlocksBuilder(dataBlocksBuilder, &nSubRows); + if (!dataBlocks) { + goto error; + } + *nTables += dataBlocks->numOfTables; + *nRows += nSubRows; + + taosArrayPush(pVnodeDataBlockList, &dataBlocks); + iter = taosHashIterate(builder->dataBlocksBuilders, iter); + } + return pVnodeDataBlockList; + +error: + for (int i = 0; i < taosArrayGetSize(pVnodeDataBlockList); ++i) { + STableDataBlocks* dataBlocks = taosArrayGetP(pVnodeDataBlockList, i); + tscDestroyDataBlock(NULL, dataBlocks, false); + } + taosArrayDestroy(&pVnodeDataBlockList); + return NULL; +} + +STableNameListBuilder* createSTableNameListBuilder() { + STableNameListBuilder* builder = calloc(1, sizeof(STableNameListBuilder)); + if (!builder) { + return NULL; + } + + builder->tableNames = taosArrayInit(1, sizeof(SName*)); + if (!builder->tableNames) { + free(builder); + return NULL; + } + + return builder; +} + +void destroySTableNameListBuilder(STableNameListBuilder* builder) { + if (!builder) { + return; + } + taosArrayDestroy(&builder->tableNames); + free(builder); +} + +bool insertSTableNameListBuilder(STableNameListBuilder* builder, SName* name) { + return taosArrayPush(builder->tableNames, &name); +} + +int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj* result) { + // statement array is empty. + if (!statements || !taosArrayGetSize(statements)) { + return TSDB_CODE_TSC_INVALID_OPERATION; + } + + STableDataBlocksListBuilder* builder = createSTableDataBlocksListBuilder(); + if (!builder) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + STableNameListBuilder* nameListBuilder = createSTableNameListBuilder(); + if (!nameListBuilder) { + destroySTableDataBlocksListBuilder(builder); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + // append the existing data blocks to builder. + for (size_t i = 0; i < taosArrayGetSize(statements); ++i) { + SSqlObj *pSql = taosArrayGetP(statements, i); + SInsertStatementParam* pInsertParam = &pSql->cmd.insertParam; + if (!pInsertParam->pDataBlocks) { + continue; + } + + assert(pInsertParam->payloadType == PAYLOAD_TYPE_KV); + assert(!pInsertParam->schemaAttached); + + // append each vnode data block to the builder. + for (size_t j = 0; j < taosArrayGetSize(pInsertParam->pDataBlocks); ++j) { + STableDataBlocks * tableBlock = taosArrayGetP(pInsertParam->pDataBlocks, j); + if (!appendSTableDataBlocksListBuilder(builder, tableBlock)) { + destroySTableDataBlocksListBuilder(builder); + destroySTableNameListBuilder(nameListBuilder); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + for (int k = 0; k < pInsertParam->numOfTables; ++k) { + if (!insertSTableNameListBuilder(nameListBuilder, pInsertParam->pTableNameList[k])) { + destroySTableDataBlocksListBuilder(builder); + destroySTableNameListBuilder(nameListBuilder); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + } + } + } + + // build the vnode data blocks. + size_t nBlocks = 0; + size_t nRows = 0; + SInsertStatementParam* pInsertParam = &result->cmd.insertParam; + SArray* pVnodeDataBlocksList = buildSTableDataBlocksListBuilder(builder, &nBlocks, &nRows); + if (!pVnodeDataBlocksList) { + destroySTableDataBlocksListBuilder(builder); + destroySTableNameListBuilder(nameListBuilder); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + // build the table name list. + size_t nTables = 0; + SName** pTableNameList = buildSTableNameListBuilder(nameListBuilder, &nTables); + if (!pTableNameList) { + destroySTableDataBlocksListBuilder(builder); + destroySTableNameListBuilder(nameListBuilder); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + assert(nTables == nBlocks); + + // replace table name list. + if (pInsertParam->pTableNameList) { + destroyTableNameList(pInsertParam); + } + pInsertParam->pTableNameList = pTableNameList; + pInsertParam->numOfTables = (int32_t) nTables; + + // replace vnode data blocks. + if (pInsertParam->pDataBlocks) { + tscDestroyBlockArrayList(result, pInsertParam->pDataBlocks); + } + pInsertParam->pDataBlocks = pVnodeDataBlocksList; + pInsertParam->numOfRows = (int32_t) nRows; + + // clean up. + destroySTableDataBlocksListBuilder(builder); + destroySTableNameListBuilder(nameListBuilder); + return TSDB_CODE_SUCCESS; +} diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 83e5804a9f59979ec83abe870cafd9d00c1da6ad..5a9ba219674a37b16c63753efb2afcad29b723a4 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -2198,715 +2198,6 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) { return result; } -/** - * A builder of SSubmitBlk. - */ -typedef struct SSubmitBlkBuilder { - // the metadata of the SSubmitBlk. - SSubmitBlk* metadata; - - // the array stores all the rows in a table, aka SArray. - SArray* rows; - -} SSubmitBlkBuilder; - -/** - * Create a SSubmitBlkBuilder using exist metadata. - * - * @param metadata the metadata. - * @return the SSubmitBlkBuilder. - */ -SSubmitBlkBuilder* createSSubmitBlkBuilder(SSubmitBlk* metadata) { - SSubmitBlkBuilder* builder = calloc(1, sizeof(SSubmitBlkBuilder)); - if (!builder) { - return NULL; - } - - builder->rows = taosArrayInit(1, sizeof(SMemRow)); - if (!builder->rows) { - free(builder); - return NULL; - } - - builder->metadata = calloc(1, sizeof(SSubmitBlk)); - if (!builder->metadata) { - taosArrayDestroy(&builder->rows); - free(builder); - return NULL; - } - memcpy(builder->metadata, metadata, sizeof(SSubmitBlk)); - - return builder; -} - -/** - * Destroy the SSubmitBlkBuilder. - * - * @param builder the SSubmitBlkBuilder. - */ -void destroySSubmitBlkBuilder(SSubmitBlkBuilder* builder) { - if (!builder) { - return; - } - taosArrayDestroy(&builder->rows); - free(builder->metadata); - free(builder); -} - -/** - * Append a SSubmitBlk* to the builder. The table uid in pBlock must be the same with the builder's. - * - * @param builder the SSubmitBlkBuilder. - * @param pBlock the pBlock to append. - * @return whether the append is success. - */ -static bool appendSSubmitBlkBuilder(SSubmitBlkBuilder* builder, SSubmitBlk *pBlock) { - assert(pBlock->uid == builder->metadata->uid); - assert(pBlock->schemaLen == 0); - - char* pRow = pBlock->data; - char* pEnd = pBlock->data + htonl(pBlock->dataLen); - while (pRow < pEnd) { - if (!taosArrayPush(builder->rows, &pRow)) { - return false; - } - pRow += memRowTLen(pRow); - } - - return true; -} - -/** - * A util function to sort SArray by key. - */ -static int32_t compareSMemRow(const void *x, const void *y) { - TSKEY left = memRowKey(*(void **) x); - TSKEY right = memRowKey(*(void **) y); - if (left == right) { - return 0; - } else { - return left > right ? 1 : -1; - } -} - -/** - * Build and write SSubmitBlk to `target` - * - * @param builder the SSubmitBlkBuilder. - * @param target the target to write. - * @param nRows the number of rows in SSubmitBlk*. - * @return the writen bytes. - */ -static size_t writeSSubmitBlkBuilder(SSubmitBlkBuilder* builder, SSubmitBlk* target, size_t* nRows) { - memcpy(target, builder->metadata, sizeof(SSubmitBlk)); - - uint32_t dataLen = 0; - taosArraySort(builder->rows, compareSMemRow); - for (int i = 0; i < taosArrayGetSize(builder->rows); ++i) { - char* pRow = taosArrayGetP(builder->rows, i); - memcpy(POINTER_SHIFT(target->data, dataLen), pRow, memRowTLen(pRow)); - dataLen += memRowTLen(pRow); - } - - *nRows = taosArrayGetSize(builder->rows); - - target->schemaLen = 0; - target->dataLen = (int32_t) htonl(dataLen); - target->numOfRows = (int16_t) htons(*nRows); - - return dataLen + sizeof(SSubmitBlk); -} - -/** - * Get the expected writen bytes of `writeSSubmitBlkBuilder`. - * - * @param builder the SSubmitBlkBuilder. - * @return the expected writen bytes of `writeSSubmitBlkBuilder`. - */ -static size_t writenSizeSSubmitBlkBuilder(SSubmitBlkBuilder* builder) { - size_t dataLen = 0; - for (int i = 0; i < taosArrayGetSize(builder->rows); ++i) { - char* pRow = taosArrayGetP(builder->rows, i); - dataLen += memRowTLen(pRow); - } - return dataLen + sizeof(SSubmitBlk); -} - -/** - * The builder to build SSubmitMsg::blocks. - */ -typedef struct SSubmitMsgBlocksBuilder { - // SHashObj. - SHashObj* blockBuilders; - int64_t vgId; -} SSubmitMsgBlocksBuilder; - -/** - * Create a SSubmitMsgBuilder. - * - * @param vgId the vgId of SSubmitMsg. - * @return the SSubmitMsgBuilder. - */ -static SSubmitMsgBlocksBuilder* createSSubmitMsgBuilder(int64_t vgId) { - SSubmitMsgBlocksBuilder* builder = calloc(1, sizeof(SSubmitMsgBlocksBuilder)); - if (!builder) { - return NULL; - } - builder->vgId = vgId; - - builder->blockBuilders = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); - if (!builder->blockBuilders) { - free(builder); - return NULL; - } - return builder; -} - -/** - * Get the expected writen bytes of `writeSSubmitMsgBlocksBuilder`. - * - * @param builder the SSubmitMsgBlocksBuilder. - * @return the expected writen bytes of `writeSSubmitMsgBlocksBuilder`. - */ -static size_t writenSizeSSubmitMsgBuilder(SSubmitMsgBlocksBuilder* builder) { - size_t allocSize = 0; - SSubmitBlkBuilder** iter = taosHashIterate(builder->blockBuilders, NULL); - while (iter) { - SSubmitBlkBuilder* blocksBuilder = *iter; - allocSize += writenSizeSSubmitBlkBuilder(blocksBuilder); - iter = taosHashIterate(builder->blockBuilders, iter); - } - return allocSize; -} - -/** - * Build and write SSubmitMsg::blocks to `pBlocks` - * - * @param builder the SSubmitBlkBuilder. - * @param pBlocks the target to write. - * @param nRows the number of row in SSubmitMsg::blocks. - * @return the writen bytes. - */ -size_t writeSSubmitMsgBlocksBuilder(SSubmitMsgBlocksBuilder* builder, SSubmitBlk* pBlocks, size_t* nRows) { - size_t nWrite = 0; - SSubmitBlkBuilder** iter = taosHashIterate(builder->blockBuilders, NULL); - - while (iter) { - size_t nSubRows = 0; - SSubmitBlkBuilder* blocksBuilder = *iter; - SSubmitBlk* pBlock = POINTER_SHIFT(pBlocks, nWrite); - nWrite += writeSSubmitBlkBuilder(blocksBuilder, pBlock, &nSubRows); - *nRows += nSubRows; - iter = taosHashIterate(builder->blockBuilders, iter); - } - return nWrite; -} - -/** - * Get the number of block in SSubmitMsgBlocksBuilder. - * @param builder the SSubmitMsgBlocksBuilder. - * @return the number of SSubmitBlk block. - */ -inline static size_t nBlockSSubmitMsgBlocksBuilder(SSubmitMsgBlocksBuilder* builder) { - return taosHashGetSize(builder->blockBuilders); -} - -/** - * Destroy the SSubmitMsgBlocksBuilder. - * - * @param builder the SSubmitMsgBlocksBuilder to destroy. - */ -static void destroySSubmitMsgBuilder(SSubmitMsgBlocksBuilder* builder) { - if (!builder) { - return; - } - - SSubmitBlkBuilder** iter = taosHashIterate(builder->blockBuilders, NULL); - while (iter) { - destroySSubmitBlkBuilder(*iter); - iter = taosHashIterate(builder->blockBuilders, iter); - } - taosHashCleanup(builder->blockBuilders); - free(builder); -} - -/** - * If the SSubmitBlkBuilder of pBlock->uid is present, returns it. Otherwise, build a new SSubmitBlkBuilder. - * - * @param builder the SSubmitMsgBlocksBuilder. - * @param pBlock the SSubmitBlk. - * @return the SSubmitBlkBuilder (NULL means failure). - */ -static SSubmitBlkBuilder* computeIfAbsentSSubmitBlkBuilder(SSubmitMsgBlocksBuilder* builder, SSubmitBlk* pBlock) { - SSubmitBlkBuilder** iter = taosHashGet(builder->blockBuilders, &pBlock->uid, sizeof(pBlock->uid)); - SSubmitBlkBuilder* blocksBuilder = NULL; - if (iter) { - return *iter; - } - - blocksBuilder = createSSubmitBlkBuilder(pBlock); - if (!blocksBuilder) { - return NULL; - } - - if (taosHashPut(builder->blockBuilders, &pBlock->uid, sizeof(pBlock->uid), &blocksBuilder, sizeof(SArray*))) { - destroySSubmitBlkBuilder(blocksBuilder); - return NULL; - } - - return blocksBuilder; -} - -/** - * Append SSubmitMsg* to the SSubmitMsgBlocksBuilder. - * - * @param builder the SSubmitMsgBlocksBuilder. - * @param pMsg the SSubmitMsg* - * @param nBlocks the number of blocks in SSubmitMsg. - * @return whether the append is success. - */ -static bool appendSSubmitMsgBlocks(SSubmitMsgBlocksBuilder* builder, SSubmitBlk* pBlocks, size_t nBlocks) { - SSubmitBlk* pBlock = pBlocks; - for (size_t i = 0; i < nBlocks; ++i) { - assert(pBlock->schemaLen == 0); - SSubmitBlkBuilder* blocksBuilder = computeIfAbsentSSubmitBlkBuilder(builder, pBlock); - if (!blocksBuilder) { - return false; - } - - if (!appendSSubmitBlkBuilder(blocksBuilder, pBlock)) { - return false; - } - - size_t blockSize = sizeof (SSubmitBlk) + htonl(pBlock->dataLen); - pBlock = POINTER_SHIFT(pBlock, blockSize); - } - return true; -} - -/** - * STableDataBlocksBuilder is a tool to build data blocks by append the existing data blocks in a vnode. - */ -typedef struct STableDataBlocksBuilder { - SSubmitMsgBlocksBuilder* blocksBuilder; - STableDataBlocks* firstBlock; - int64_t vgId; -} STableDataBlocksBuilder; - -/** - * Create the STableDataBlocksBuilder. - * - * @param vgId the vgId of STableDataBlocksBuilder. - * @return the STableDataBlocksBuilder. - */ -static STableDataBlocksBuilder* createSTableDataBlocksBuilder(int64_t vgId) { - STableDataBlocksBuilder* builder = calloc(1, sizeof(STableDataBlocksBuilder)); - if (!builder) { - return NULL; - } - - builder->blocksBuilder = createSSubmitMsgBuilder(vgId); - if (!builder->blocksBuilder) { - free(builder); - return NULL; - } - - builder->vgId = vgId; - builder->firstBlock = NULL; - return builder; -} - -/** - * Destroy the STableDataBlocksBuilder. - * @param builder the STableDataBlocksBuilder. - */ -static void destroySTableDataBlocksBuilder(STableDataBlocksBuilder *builder) { - if (!builder) { - return; - } - - destroySSubmitMsgBuilder(builder->blocksBuilder); - free(builder); -} - -/** - * Append a data blocks to STableDataBlocksBuilder. - * @param builder the STableDataBlocksBuilder. - * @param dataBlocks the dataBlocks to append. the vgId of dataBlocks must be same with the STableDataBlocksBuilder. - * @return whether the append is success. - */ -static bool appendSTableDataBlocksBuilder(STableDataBlocksBuilder* builder, STableDataBlocks* dataBlocks) { - if (!dataBlocks || dataBlocks->vgId != builder->vgId) { - return false; - } - - if (!builder->firstBlock) { - builder->firstBlock = dataBlocks; - } - - SSubmitBlk* pBlocks = (SSubmitBlk *)(dataBlocks->pData + dataBlocks->headerSize); - return appendSSubmitMsgBlocks(builder->blocksBuilder, pBlocks, dataBlocks->numOfTables); -} - -/** - * Build the data blocks for single vnode. - * @param builder the STableDataBlocksBuilder. - * @param nRows the number of row in STableDataBlocks. - * @return the data blocks for single vnode. - */ -static STableDataBlocks* buildSTableDataBlocksBuilder(STableDataBlocksBuilder* builder, size_t* nRows) { - SSubmitMsgBlocksBuilder* blocksBuilder = builder->blocksBuilder; - STableDataBlocks *firstBlock = builder->firstBlock; - if (!firstBlock) { - return NULL; - } - - size_t nWriteSize = writenSizeSSubmitMsgBuilder(builder->blocksBuilder); - size_t nHeaderSize = firstBlock->headerSize; - size_t nAllocSize = nWriteSize + nHeaderSize; - - STableDataBlocks* dataBlocks = NULL; - int32_t code = tscCreateDataBlock(nAllocSize, 0, (int32_t) nHeaderSize, &firstBlock->tableName, firstBlock->pTableMeta, &dataBlocks); - if (code != TSDB_CODE_SUCCESS) { - return NULL; - } - - dataBlocks->size = nHeaderSize; - memcpy(dataBlocks->pData, firstBlock->pData, nHeaderSize); - dataBlocks->size += writeSSubmitMsgBlocksBuilder(blocksBuilder, (SSubmitBlk *) (dataBlocks->pData + nHeaderSize), nRows); - dataBlocks->numOfTables = (int32_t) nBlockSSubmitMsgBlocksBuilder(blocksBuilder); - return dataBlocks; -} - -/** - * STableDataBlocksListBuilder is a tool to build vnode data blocks list by appending exist data blocks. - */ -typedef struct STableDataBlocksListBuilder { - SHashObj* dataBlocksBuilders; -} STableDataBlocksListBuilder; - -/** - * Create the STableDataBlocksListBuilder. - * - * @return the STableDataBlocksListBuilder. - */ -static STableDataBlocksListBuilder* createSTableDataBlocksListBuilder() { - STableDataBlocksListBuilder* builder = calloc(1, sizeof(STableDataBlocksListBuilder)); - if (!builder) { - return NULL; - } - - builder->dataBlocksBuilders = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); - if (!builder->dataBlocksBuilders) { - free(builder); - return NULL; - } - - return builder; -} - -/** - * Destroy the STableDataBlocksListBuilder. - * - * @param builder the STableDataBlocksListBuilder. - */ -static void destroySTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder) { - if (!builder) { - return; - } - - STableDataBlocksBuilder** iter = taosHashIterate(builder->dataBlocksBuilders, NULL); - while (iter) { - destroySTableDataBlocksBuilder(*iter); - iter = taosHashIterate(builder->dataBlocksBuilders, iter); - } - taosHashCleanup(builder->dataBlocksBuilders); - free(builder); -} - -/** - * Append a data blocks to STableDataBlocksListBuilder. - * - * @param builder the STableDataBlocksListBuilder. - * @param dataBlocks the data blocks. - * @return whether the append is success. - */ -static bool appendSTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder, STableDataBlocks* dataBlocks) { - STableDataBlocksBuilder** item = taosHashGet(builder->dataBlocksBuilders, &dataBlocks->vgId, sizeof(dataBlocks->vgId)); - STableDataBlocksBuilder* blocksBuilder = NULL; - if (item) { - blocksBuilder = *item; - } else { - blocksBuilder = createSTableDataBlocksBuilder(dataBlocks->vgId); - if (!blocksBuilder) { - return false; - } - - if (taosHashPut(builder->dataBlocksBuilders, &dataBlocks->vgId, sizeof(dataBlocks->vgId), &blocksBuilder, sizeof(STableDataBlocksBuilder*))) { - destroySTableDataBlocksBuilder(blocksBuilder); - return false; - } - } - - return appendSTableDataBlocksBuilder(blocksBuilder, dataBlocks); -} - -/** - * Build the vnode data blocks list. - * - * @param builder the STableDataBlocksListBuilder. - * @param nTables the number of table in vnode data blocks list. - * @param nRows the number of row in vnode data blocks list. - * @return the vnode data blocks list. - */ -static SArray* buildSTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder, size_t* nTables, size_t* nRows) { - SArray* pVnodeDataBlockList = taosArrayInit(taosHashGetSize(builder->dataBlocksBuilders), sizeof(STableDataBlocks*)); - if (!pVnodeDataBlockList) { - return NULL; - } - - STableDataBlocksBuilder** iter = taosHashIterate(builder->dataBlocksBuilders, NULL); - while (iter) { - size_t nSubRows = 0; - STableDataBlocksBuilder* dataBlocksBuilder = *iter; - STableDataBlocks* dataBlocks = buildSTableDataBlocksBuilder(dataBlocksBuilder, &nSubRows); - if (!dataBlocks) { - goto error; - } - *nTables += dataBlocks->numOfTables; - *nRows += nSubRows; - - taosArrayPush(pVnodeDataBlockList, &dataBlocks); - iter = taosHashIterate(builder->dataBlocksBuilders, iter); - } - return pVnodeDataBlockList; - -error: - for (int i = 0; i < taosArrayGetSize(pVnodeDataBlockList); ++i) { - STableDataBlocks* dataBlocks = taosArrayGetP(pVnodeDataBlockList, i); - tscDestroyDataBlock(NULL, dataBlocks, false); - } - taosArrayDestroy(&pVnodeDataBlockList); - return NULL; -} - -/** - * A Builder to build SInsertStatementParam::pTableNameList. - */ -typedef struct STableNameListBuilder { - SArray* tableNames; -} STableNameListBuilder; - -/** - * Create STableNameListBuilder. - */ -STableNameListBuilder* createSTableNameListBuilder() { - STableNameListBuilder* builder = calloc(1, sizeof(STableNameListBuilder)); - if (!builder) { - return NULL; - } - - builder->tableNames = taosArrayInit(1, sizeof(SName*)); - if (!builder->tableNames) { - free(builder); - return NULL; - } - - return builder; -} - -/** - * Destroy the STableNameListBuilder. - * @param builder the STableNameListBuilder. - */ -void destroySTableNameListBuilder(STableNameListBuilder* builder) { - if (!builder) { - return; - } - taosArrayDestroy(&builder->tableNames); - free(builder); -} - -/** - * A util function to compare two SName. - */ -static int32_t compareSName(const void *x, const void *y) { - SName* left = *((SName **) x); - SName* right = *((SName **) y); - if (left == right) { - return 0; - } - return strncmp((const char*) left, (const char*) right, sizeof(SName)); -} - -/** - * Insert a SName to builder. - * - * @param builder the STableNameListBuilder. - * @param name the table name. - * @return whether it is success. - */ -bool insertSTableNameListBuilder(STableNameListBuilder* builder, SName* name) { - return taosArrayPush(builder->tableNames, &name); -} - -/** - * Build the STable name list. - * - * @param builder the STableNameListBuilder. - * @param numOfTables the number of table. - * @return the STable name list. - */ -SName** buildSTableNameListBuilder(STableNameListBuilder* builder, size_t* numOfTables) { - if (!taosArrayGetSize(builder->tableNames)) { - *numOfTables = 0; - return NULL; - } - - // sort and unique. - taosArraySort(builder->tableNames, compareSName); - size_t tail = 0; - for (size_t i = 1; i < taosArrayGetSize(builder->tableNames); ++i) { - SName* last = taosArrayGetP(builder->tableNames, tail); - SName* current = taosArrayGetP(builder->tableNames, i); - if (compareSName(last, current) != 0) { - ++tail; - taosArraySet(builder->tableNames, tail, ¤t); - } - } - - // build tableNameList - SName** tableNames = calloc(tail + 1, sizeof(SName*)); - if (!tableNames) { - return NULL; - } - - for (size_t i = 0; i <= tail; ++i) { - SName* clone = malloc(sizeof(SName)); - if (!clone) { - goto error; - } - memcpy(clone, taosArrayGetP(builder->tableNames, i), sizeof(SName)); - tableNames[i] = clone; - } - - *numOfTables = tail + 1; - return tableNames; - -error: - for (size_t i = 0; i <= tail; ++i) { - if (tableNames[i]) { - free(tableNames[i]); - } - } - free(tableNames); - return NULL; -} - -/** - * Merge the KV-PayLoad SQL objects into single one. - * The statements here must be an insertion statement and no schema attached. - * - * @param statements the array of statements. a.k.a SArray. - * @param result the returned result. result is not null! - * @return the status code. usually TSDB_CODE_SUCCESS. - */ -int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj *result) { - // statement array is empty. - if (!statements || !taosArrayGetSize(statements)) { - return TSDB_CODE_TSC_INVALID_OPERATION; - } - - STableDataBlocksListBuilder* builder = createSTableDataBlocksListBuilder(); - if (!builder) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - - STableNameListBuilder* nameListBuilder = createSTableNameListBuilder(); - if (!nameListBuilder) { - destroySTableDataBlocksListBuilder(builder); - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - - // append the existing data blocks to builder. - for (size_t i = 0; i < taosArrayGetSize(statements); ++i) { - SSqlObj *pSql = taosArrayGetP(statements, i); - SInsertStatementParam* pInsertParam = &pSql->cmd.insertParam; - if (!pInsertParam->pDataBlocks) { - continue; - } - - assert(pInsertParam->payloadType == PAYLOAD_TYPE_KV); - assert(!pInsertParam->schemaAttached); - - // append each vnode data block to the builder. - for (size_t j = 0; j < taosArrayGetSize(pInsertParam->pDataBlocks); ++j) { - STableDataBlocks * tableBlock = taosArrayGetP(pInsertParam->pDataBlocks, j); - if (!appendSTableDataBlocksListBuilder(builder, tableBlock)) { - destroySTableDataBlocksListBuilder(builder); - destroySTableNameListBuilder(nameListBuilder); - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - - for (int k = 0; k < pInsertParam->numOfTables; ++k) { - if (!insertSTableNameListBuilder(nameListBuilder, pInsertParam->pTableNameList[k])) { - destroySTableDataBlocksListBuilder(builder); - destroySTableNameListBuilder(nameListBuilder); - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - } - } - } - - // build the vnode data blocks. - size_t numOfBlocks = 0; - size_t numOfRows = 0; - SInsertStatementParam* pInsertParam = &result->cmd.insertParam; - SArray* pVnodeDataBlocksList = buildSTableDataBlocksListBuilder(builder, &numOfBlocks, &numOfRows); - if (!pVnodeDataBlocksList) { - destroySTableDataBlocksListBuilder(builder); - destroySTableNameListBuilder(nameListBuilder); - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - - // build the table name list. - size_t numOfTables = 0; - SName** pTableNameList = buildSTableNameListBuilder(nameListBuilder, &numOfTables); - if (!pTableNameList) { - destroySTableDataBlocksListBuilder(builder); - destroySTableNameListBuilder(nameListBuilder); - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - assert(numOfTables == numOfBlocks); - - // replace table name list. - if (pInsertParam->pTableNameList) { - for (size_t i = 0; i < pInsertParam->numOfTables; ++i) { - if (pInsertParam->pTableNameList[i]) { - free(pInsertParam->pTableNameList[i]); - } - } - free(pInsertParam->pTableNameList); - } - pInsertParam->pTableNameList = pTableNameList; - pInsertParam->numOfTables = (int32_t) numOfTables; - - // replace vnode data blocks. - if (pInsertParam->pDataBlocks) { - for (size_t i = 0; i < taosArrayGetSize(pInsertParam->pDataBlocks); ++i) { - tscDestroyDataBlock(result, taosArrayGetP(pInsertParam->pDataBlocks, i), false); - } - taosArrayDestroy(&pInsertParam->pDataBlocks); - } - pInsertParam->pDataBlocks = pVnodeDataBlocksList; - pInsertParam->numOfRows = (int32_t) numOfRows; - - // clean up. - destroySTableDataBlocksListBuilder(builder); - destroySTableNameListBuilder(nameListBuilder); - return TSDB_CODE_SUCCESS; -} - int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertParam, bool freeBlockMap) { const int INSERT_HEAD_SIZE = sizeof(SMsgDesc) + sizeof(SSubmitMsg); int code = 0; @@ -2920,7 +2211,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertPar // alloc table name list. size_t numOfTables = taosHashGetSize(pInsertParam->pTableBlockHashList); if (pInsertParam->pTableNameList) { - tfree(pInsertParam->pTableNameList); + destroyTableNameList(pInsertParam); } pInsertParam->pTableNameList = calloc(numOfTables, sizeof(SName*)); pInsertParam->numOfTables = (int32_t) numOfTables;