diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 0f27fbc027b7a4e2e2b568233bb56cf140a32bb1..c59ad5ba267c3e383b6a1538d61fd66ca7caeb4f 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -149,7 +149,7 @@ void* tscDestroyUdfArrayList(SArray* pUdfList); void* tscDestroyBlockHashTable(SSqlObj* pSql, SHashObj* pBlockHashTable, bool removeMeta); int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock); -int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj **result); +int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj *result); int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertParam, bool freeBlockMap); int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, SName* pName, STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList); diff --git a/src/client/src/tscBulkWrite.c b/src/client/src/tscBulkWrite.c index 972793a5d6bcbbc4427dd6b039cd9d9fdb512c1c..509bedb0f4e0b52d2d3095222591a820b73069c4 100644 --- a/src/client/src/tscBulkWrite.c +++ b/src/client/src/tscBulkWrite.c @@ -131,17 +131,23 @@ int32_t dispatcherStatementMerge(SArray* statements, SSqlObj** result) { // merge the statements into single one. tscDebug("start to merge %zu sql objs", count); - int32_t code = tscMergeKVPayLoadSqlObj(statements, result); + SSqlObj *pSql = *((SSqlObj**)taosArrayGet(statements, 0)); + SSqlObj *pNew = createSimpleSubObj(pSql, batchResultCallback, context, TSDB_SQL_INSERT); + if (!pNew) { + free(context); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + int32_t code = tscMergeKVPayLoadSqlObj(statements, pNew); if (code != TSDB_CODE_SUCCESS) { const char* msg = tstrerror(code); tscDebug("failed to merge sql objects: %s", msg); free(context); - } else { - // set the merged sql object callback. - (*result)->fp = batchResultCallback; - (*result)->fetchFp = (*result)->fp; - (*result)->param = context; + taosReleaseRef(tscObjRef, pNew->self); + return code; } + + *result = pNew; return code; } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index a8b5d91be31e18ec66c8709d047097a55b2df148..d18e48eff46d5896678e1d5f63c61d3a8384afb8 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -2212,29 +2212,6 @@ static void extractTableNameList(SSqlObj *pSql, SInsertStatementParam *pInsertPa } } -/** - * Resize the the data blocks data. - * - * @param dataBlocks the data blocks. - * @param destSize the destination size. - * @return whether is success. - */ -static bool resizeDataBlocksData(STableDataBlocks* dataBlocks, size_t destSize) { - if (dataBlocks->nAllocSize >= destSize) { - return true; - } - - size_t nAllocSize = destSize + (destSize >> 1); - char *pData = realloc(dataBlocks->pData, nAllocSize); - if (!pData) { - return false; - } - - dataBlocks->pData = pData; - dataBlocks->nAllocSize = nAllocSize; - return true; -} - /** * A builder of SSubmitBlk. */ @@ -2279,7 +2256,7 @@ SSubmitBlkBuilder* createSSubmitBlkBuilder(SSubmitBlk* metadata) { /** * Destroy the SSubmitBlkBuilder. * - * @param builder + * @param builder the SSubmitBlkBuilder. */ void destroySSubmitBlkBuilder(SSubmitBlkBuilder* builder) { if (!builder) { @@ -2432,6 +2409,15 @@ size_t writeSSubmitMsgBlocksBuilder(SSubmitMsgBlocksBuilder* builder, SSubmitBlk return nWrite; } +/** + * Get the number of block in SSubmitMsgBlocksBuilder. + * @param builder the SSubmitMsgBlocksBuilder. + * @return the number of SSubmitBlk block. + */ +size_t blockSizeSSubmitMsgBlocksBuilder(SSubmitMsgBlocksBuilder* builder) { + return taosHashGetSize(builder->blockBuilders); +} + /** * Destroy the SSubmitMsgBlocksBuilder. * @@ -2477,6 +2463,7 @@ static SSubmitBlkBuilder* computeIfAbsentSSubmitBlkBuilder(SSubmitMsgBlocksBuild return blocksBuilder; } + /** * Append SSubmitMsg* to the SSubmitMsgBlocksBuilder. * @@ -2504,6 +2491,210 @@ static bool appendSSubmitMsgBlocks(SSubmitMsgBlocksBuilder* builder, SSubmitBlk* return true; } +/** + * STableDataBlocksBuilder is a tool to build data blocks by append the existing data blocks in a vnode. + */ +typedef struct STableDataBlocksBuilder { + SSubmitMsgBlocksBuilder* blocksBuilder; + STableDataBlocks* firstBlock; + int64_t vgId; +} STableDataBlocksBuilder; + +/** + * Create the STableDataBlocksBuilder. + * + * @param vgId the vgId of STableDataBlocksBuilder. + * @return the STableDataBlocksBuilder. + */ +static STableDataBlocksBuilder* createSTableDataBlocksBuilder(int64_t vgId) { + STableDataBlocksBuilder* builder = calloc(1, sizeof(STableDataBlocksBuilder)); + if (!builder) { + return NULL; + } + + builder->blocksBuilder = createSSubmitMsgBuilder(vgId); + if (!builder->blocksBuilder) { + free(builder); + return NULL; + } + + builder->vgId = vgId; + builder->firstBlock = NULL; + return builder; +} + +/** + * Destroy the STableDataBlocksBuilder. + * @param builder the STableDataBlocksBuilder. + */ +static void destroySTableDataBlocksBuilder(STableDataBlocksBuilder *builder) { + if (!builder) { + return; + } + + destroySSubmitMsgBuilder(builder->blocksBuilder); + free(builder); +} + +/** + * Append a data blocks to STableDataBlocksBuilder. + * @param builder the STableDataBlocksBuilder. + * @param dataBlocks the dataBlocks to append. the vgId of dataBlocks must be same with the STableDataBlocksBuilder. + * @return whether the append is success. + */ +static bool appendSTableDataBlocksBuilder(STableDataBlocksBuilder* builder, STableDataBlocks* dataBlocks) { + if (!dataBlocks || dataBlocks->vgId != builder->vgId) { + return false; + } + + if (!builder->firstBlock) { + builder->firstBlock = dataBlocks; + } + + SSubmitBlk* pBlocks = (SSubmitBlk *)(dataBlocks->pData + dataBlocks->headerSize); + return appendSSubmitMsgBlocks(builder->blocksBuilder, pBlocks, dataBlocks->numOfTables); +} + +/** + * Build the data blocks for single vnode. + * @param builder the STableDataBlocksBuilder. + * @return the data blocks for single vnode. + */ +static STableDataBlocks* buildSTableDataBlocksBuilder(STableDataBlocksBuilder* builder) { + SSubmitMsgBlocksBuilder* blocksBuilder = builder->blocksBuilder; + STableDataBlocks *firstBlock = builder->firstBlock; + if (!firstBlock) { + return NULL; + } + + size_t nWriteSize = writenSizeSSubmitMsgBuilder(builder->blocksBuilder); + size_t nHeaderSize = firstBlock->headerSize; + size_t nAllocSize = nWriteSize + nHeaderSize; + + STableDataBlocks* dataBlocks = NULL; + int32_t code = tscCreateDataBlock(nAllocSize, 0, nHeaderSize, &firstBlock->tableName, firstBlock->pTableMeta, &dataBlocks); + if (code != TSDB_CODE_SUCCESS) { + return NULL; + } + + dataBlocks->size = nHeaderSize; + memcpy(dataBlocks->pData, firstBlock->pData, nHeaderSize); + dataBlocks->size += writeSSubmitMsgBlocksBuilder(blocksBuilder, (SSubmitBlk *) (dataBlocks->pData + nHeaderSize)); + dataBlocks->numOfTables = blockSizeSSubmitMsgBlocksBuilder(blocksBuilder); + return dataBlocks; +} + +/** + * STableDataBlocksListBuilder is a tool to build vnode data blocks list by appending exist data blocks. + */ +typedef struct STableDataBlocksListBuilder { + SHashObj* dataBlocksBuilders; +} STableDataBlocksListBuilder; + +/** + * Create the STableDataBlocksListBuilder. + * + * @return the STableDataBlocksListBuilder. + */ +static STableDataBlocksListBuilder* createSTableDataBlocksListBuilder() { + STableDataBlocksListBuilder* builder = calloc(1, sizeof(STableDataBlocksListBuilder)); + if (!builder) { + return NULL; + } + + builder->dataBlocksBuilders = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); + if (!builder->dataBlocksBuilders) { + free(builder); + return NULL; + } + + return builder; +} + +/** + * Destroy the STableDataBlocksListBuilder. + * + * @param builder the STableDataBlocksListBuilder. + */ +static void destroySTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder) { + if (!builder) { + return; + } + + STableDataBlocksBuilder** iter = taosHashIterate(builder->dataBlocksBuilders, NULL); + while (iter) { + destroySTableDataBlocksBuilder(*iter); + iter = taosHashIterate(builder->dataBlocksBuilders, iter); + } + taosHashCleanup(builder->dataBlocksBuilders); + free(builder); +} + +/** + * Append a data blocks to STableDataBlocksListBuilder. + * + * @param builder the STableDataBlocksListBuilder. + * @param dataBlocks the data blocks. + * @return whether the append is success. + */ +static bool appendSTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder, STableDataBlocks* dataBlocks) { + STableDataBlocksBuilder** item = taosHashGet(builder->dataBlocksBuilders, &dataBlocks->vgId, sizeof(dataBlocks->vgId)); + STableDataBlocksBuilder* blocksBuilder = NULL; + if (item) { + blocksBuilder = *item; + } else { + blocksBuilder = createSTableDataBlocksBuilder(dataBlocks->vgId); + if (!blocksBuilder) { + return false; + } + + if (taosHashPut(builder->dataBlocksBuilders, &dataBlocks->vgId, sizeof(dataBlocks->vgId), &blocksBuilder, sizeof(STableDataBlocksBuilder*))) { + destroySTableDataBlocksBuilder(blocksBuilder); + return false; + } + } + + return appendSTableDataBlocksBuilder(blocksBuilder, dataBlocks); +} + +/** + * Build the vnode data blocks list. + * + * @param builder the STableDataBlocksListBuilder. + * @param numOfTables the number of tables. + * @return the vnode data blocks list. + */ +static SArray* buildSTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder, int32_t* numOfTables) { + SArray* pVnodeDataBlockList = taosArrayInit(taosHashGetSize(builder->dataBlocksBuilders), sizeof(STableDataBlocks*)); + if (!pVnodeDataBlockList) { + return NULL; + } + + STableDataBlocksBuilder** iter = taosHashIterate(builder->dataBlocksBuilders, NULL); + while (iter) { + STableDataBlocksBuilder* dataBlocksBuilder = *iter; + STableDataBlocks* dataBlocks = buildSTableDataBlocksBuilder(dataBlocksBuilder); + if (!dataBlocks) { + goto error; + } + + numOfTables += dataBlocks->numOfTables; + + taosArrayPush(pVnodeDataBlockList, &dataBlocks); + iter = taosHashIterate(builder->dataBlocksBuilders, iter); + } + + return pVnodeDataBlockList; + +error: + for (int i = 0; i < taosArrayGetSize(pVnodeDataBlockList); ++i) { + STableDataBlocks* dataBlocks = *((STableDataBlocks**)taosArrayGet(pVnodeDataBlockList, i)); + tscDestroyDataBlock(NULL, dataBlocks, false); + } + taosArrayDestroy(&pVnodeDataBlockList); + return NULL; +} + /** * Merge the KV-PayLoad SQL objects into single one. * The statements here must be an insertion statement and no schema attached. @@ -2512,129 +2703,54 @@ static bool appendSSubmitMsgBlocks(SSubmitMsgBlocksBuilder* builder, SSubmitBlk* * @param result the returned result. result is not null! * @return the status code. usually TSDB_CODE_SUCCESS. */ -int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj **result) { +int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj *result) { // statement array is empty. if (!statements || !taosArrayGetSize(statements)) { return TSDB_CODE_TSC_INVALID_OPERATION; } - // a.k.a SHashObj, the key value represents vgroup id. - SHashObj* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); - if (!pVnodeDataBlockHashList) { + STableDataBlocksListBuilder* builder = createSTableDataBlocksListBuilder(); + if (!builder) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } - // let the first statement in the array to be the merged result. - SSqlObj* merged = *((SSqlObj**) taosArrayGet(statements, 0)); - SSqlCmd* pMergeCmd = &merged->cmd; - SInsertStatementParam* pMergeInsertParam = &pMergeCmd->insertParam; - SArray* pMergeDataBlocks = pMergeInsertParam->pDataBlocks; - - // initialize the `pVnodeDataBlockHashList`. - assert(pMergeInsertParam->payloadType == PAYLOAD_TYPE_KV); - assert(!pMergeInsertParam->schemaAttached); - for (int i = 0; i < taosArrayGetSize(pMergeInsertParam->pDataBlocks); ++i) { - STableDataBlocks *pDataBlocks = *((STableDataBlocks** )taosArrayGet(pMergeInsertParam->pDataBlocks, i)); - if (taosHashPut(pVnodeDataBlockHashList, &pDataBlocks->vgId, sizeof(pDataBlocks->vgId), &pDataBlocks, sizeof(STableDataBlocks *))) { - taosHashCleanup(pVnodeDataBlockHashList); - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - } - - // merge sql obj statements[i] into sql obj `merged`. - for (int i = 1; i < taosArrayGetSize(statements); ++i) { + // append the existing data blocks to builder. + for (int i = 0; i < taosArrayGetSize(statements); ++i) { SSqlObj *pSql = *((SSqlObj**) taosArrayGet(statements, i)); - SSqlCmd *pCmd = &pSql->cmd; - SInsertStatementParam* pInsertParam = &pCmd->insertParam; + SInsertStatementParam* pInsertParam = &pSql->cmd.insertParam; + if (!pInsertParam->pDataBlocks) { + continue; + } assert(pInsertParam->payloadType == PAYLOAD_TYPE_KV); assert(!pInsertParam->schemaAttached); - // merge all the data blocks by vgroup id. - for (int j = 0; pInsertParam->pDataBlocks && j < taosArrayGetSize(pInsertParam->pDataBlocks); ++j) { + for (int j = 0; j < taosArrayGetSize(pInsertParam->pDataBlocks); ++j) { STableDataBlocks * tableBlock = *((STableDataBlocks **) taosArrayGet(pInsertParam->pDataBlocks, j)); - // SSubmitMsg *pBlocks = (SSubmitMsg *)tableBlock->pData; - - // get the data blocks of vgroup id. - STableDataBlocks *dataBuf = NULL; - STableDataBlocks** iter = taosHashGet(pVnodeDataBlockHashList, &tableBlock->vgId, sizeof(tableBlock->vgId)); - if (iter == NULL) { - dataBuf = tableBlock; - if (taosHashPut(pVnodeDataBlockHashList, &tableBlock->vgId, sizeof(tableBlock->vgId), &dataBuf, sizeof(STableDataBlocks *))) { - taosHashCleanup(pVnodeDataBlockHashList); - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - - if (!taosArrayPush(pMergeDataBlocks, &dataBuf)) { - taosHashCleanup(pVnodeDataBlockHashList); - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - } else { - dataBuf = *iter; - } - - // header: SMsgDesc + SSubmitMsg(without SSubmitBlk[]) - assert(dataBuf->headerSize == (sizeof(SMsgDesc) + sizeof(SSubmitMsg))); - assert(dataBuf->headerSize == tableBlock->headerSize); - const size_t destSize = dataBuf->size + (tableBlock->size - tableBlock->headerSize); - - if (!resizeDataBlocksData(dataBuf, destSize)) { - tscError("0x%" PRIx64 " failed to allocate memory for merging submit block, size:%d", pInsertParam->objectId, - dataBuf->nAllocSize); - - taosHashCleanup(pVnodeDataBlockHashList); - tfree(dataBuf->pData); + if (!appendSTableDataBlocksListBuilder(builder, tableBlock)) { + destroySTableDataBlocksListBuilder(builder); return TSDB_CODE_TSC_OUT_OF_MEMORY; } - - memcpy(dataBuf->pData + dataBuf->size, tableBlock->pData + tableBlock->headerSize, tableBlock->size - tableBlock->headerSize); - dataBuf->size = destSize; - dataBuf->numOfTables += tableBlock->numOfTables; - tscDestroyDataBlock(pSql, tableBlock, false); } - - // free the data blocks and sql objs. (because it is no longer needed). - taosArrayDestroy(&pInsertParam->pDataBlocks); } + // build the vnode data blocks. + int32_t numOfTables = 0; + SInsertStatementParam* pInsertParam = &result->cmd.insertParam; + SArray* pVnodeDataBlocksList = buildSTableDataBlocksListBuilder(builder, &numOfTables); + if (!pVnodeDataBlocksList) { + destroySTableDataBlocksListBuilder(builder); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + pInsertParam->pDataBlocks = pVnodeDataBlocksList; + pInsertParam->numOfTables = numOfTables; + // clean up. - for (int i = 1; i < taosArrayGetSize(statements); ++i) { + for (int i = 0; i < taosArrayGetSize(statements); ++i) { SSqlObj *pSql = *((SSqlObj**) taosArrayGet(statements, i)); taosReleaseRef(tscObjRef, pSql->self); } - - taosHashCleanup(pVnodeDataBlockHashList); - *result = merged; - - // rebuild SubmitMsg::blocks. - for (int i = 0; i < taosArrayGetSize(pMergeDataBlocks); ++i) { - STableDataBlocks* pDataBlocks = *((STableDataBlocks **) taosArrayGet(pMergeDataBlocks, i)); - SSubmitMsgBlocksBuilder *builder = createSSubmitMsgBuilder(pDataBlocks->vgId); - if (!builder) { - break ; - } - - SSubmitBlk* pBlock = (SSubmitBlk *) (pDataBlocks->pData + pDataBlocks->headerSize); - if (!appendSSubmitMsgBlocks(builder, pBlock, pDataBlocks->numOfTables)) { - destroySSubmitMsgBuilder(builder); - break; - } - - size_t nAllocSize = pDataBlocks->headerSize + writenSizeSSubmitMsgBuilder(builder); - char *pData = calloc(1, nAllocSize); - if (!pData) { - destroySSubmitMsgBuilder(builder); - break; - } - - pDataBlocks->nAllocSize = nAllocSize; - pDataBlocks->size = pDataBlocks->headerSize + writeSSubmitMsgBlocksBuilder(builder, (SSubmitBlk*)(pData + pDataBlocks->headerSize)); - memcpy(pData, pDataBlocks->pData, pDataBlocks->headerSize); - free(pDataBlocks->pData); - pDataBlocks->pData = pData; - - destroySSubmitMsgBuilder(builder); - } + destroySTableDataBlocksListBuilder(builder); return TSDB_CODE_SUCCESS; }