diff --git a/src/client/inc/tscBulkWrite.h b/src/client/inc/tscBulkWrite.h index 33a6a356b16cd4d778f3c734d40016a566b95c47..a51d699ac87beb27037d600caa1691bbbff60a29 100644 --- a/src/client/inc/tscBulkWrite.h +++ b/src/client/inc/tscBulkWrite.h @@ -101,6 +101,7 @@ void destroyAsyncDispatcher(SAsyncBulkWriteDispatcher* dispatcher); * 1. auto batch feature on the sql object must be enabled. * 2. must be an `insert into ... value ...` statement. * 3. the payload type must be kv payload. + * 4. no schema attached. * * @param dispatcher the async dispatcher. * @param pSql the sql object to check. diff --git a/src/client/src/tscBulkWrite.c b/src/client/src/tscBulkWrite.c index 11b5c3902989b2d56fc72a0e1d4bb19e890a2cc2..65991e15724077875715bf62fd7860f17ceb1e29 100644 --- a/src/client/src/tscBulkWrite.c +++ b/src/client/src/tscBulkWrite.c @@ -431,6 +431,11 @@ bool tscSupportBulkInsertion(SAsyncBulkWriteDispatcher* dispatcher, SSqlObj* pSq return false; } + // no schema attached. + if (pInsertParam->schemaAttached) { + return false; + } + // too many insertion rows, fail back to normal insertion. if (statementGetInsertionRows(pSql) >= dispatcher->batchSize) { return false; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index a923757af8bb3d0a12c984f17c8e4fb2f1146e2b..479de72dd65254bffec618410ac21ad18d4c9456 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -2198,25 +2198,314 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) { return result; } -static void extractTableNameList(SSqlObj *pSql, SInsertStatementParam *pInsertParam) { +static void extractTableNameList(SSqlObj *pSql, SInsertStatementParam *pInsertParam, SArray* pTableDataBlockList) { pInsertParam->numOfTables = (int32_t) taosHashGetSize(pInsertParam->pTableBlockHashList); if (pInsertParam->pTableNameList == NULL) { pInsertParam->pTableNameList = malloc(pInsertParam->numOfTables * POINTER_BYTES); } - - STableDataBlocks **p1 = taosHashIterate(pInsertParam->pTableBlockHashList, NULL); - int32_t i = 0; - while(p1) { - STableDataBlocks* pBlocks = *p1; + + for (int i = 0; i < taosArrayGetSize(pTableDataBlockList); ++i) { + STableDataBlocks* pBlocks = *((STableDataBlocks**) taosArrayGet(pTableDataBlockList, i)); //tfree(pInsertParam->pTableNameList[i]); - pInsertParam->pTableNameList[i++] = tNameDup(&pBlocks->tableName); - p1 = taosHashIterate(pInsertParam->pTableBlockHashList, p1); + pInsertParam->pTableNameList[i] = tNameDup(&pBlocks->tableName); + } +} + +/** + * Resize the the data blocks data. + * + * @param dataBlocks the data blocks. + * @param destSize the destination size. + * @return whether is success. + */ +inline static bool resizeDataBlocksData(STableDataBlocks* dataBlocks, size_t destSize) { + if (dataBlocks->nAllocSize >= destSize) { + return true; + } + + size_t nAllocSize = destSize + (destSize >> 1); + char *pData = realloc(dataBlocks->pData, dataBlocks->nAllocSize); + if (!pData) { + return false; + } + + dataBlocks->pData = pData; + dataBlocks->nAllocSize = nAllocSize; + return false; +} + +/** + * A builder of SSubmitBlk. + */ +typedef struct SSubmitBlkBuilder { + // the metadata of the SSubmitBlk. + SSubmitBlk* metadata; + + // the array stores all the rows in a table, aka SArray. + SArray* rows; + +} SSubmitBlkBuilder; + +/** + * Create a SSubmitBlkBuilder using exist metadata. + * + * @param metadata the metadata. + * @return the SSubmitBlkBuilder. + */ +SSubmitBlkBuilder* createSSubmitBlkBuilder(SSubmitBlk* metadata) { + SSubmitBlkBuilder* builder = calloc(1, sizeof(SSubmitBlkBuilder)); + if (!builder) { + return NULL; + } + + builder->rows = taosArrayInit(1, sizeof(SMemRow)); + if (!builder->rows) { + free(builder); + return NULL; + } + + builder->metadata = calloc(1, sizeof(SSubmitBlk)); + if (!builder->metadata) { + taosArrayDestroy(&builder->rows); + free(builder); + return NULL; + } + memcpy(builder->metadata, metadata, sizeof(SSubmitBlk)); + + return builder; +} + +/** + * Destroy the SSubmitBlkBuilder. + * + * @param builder + */ +void destroySSubmitBlkBuilder(SSubmitBlkBuilder* builder) { + if (!builder) { + return; + } + taosArrayDestroy(&builder->rows); + free(builder->metadata); + free(builder); +} + +/** + * Append a SSubmitBlk* to the builder. The table uid in pBlock must be the same with the builder's. + * + * @param builder the SSubmitBlkBuilder. + * @param pBlock the pBlock to append. + * @return whether the append is success. + */ +static bool appendSSubmitBlkBuilder(SSubmitBlkBuilder* builder, SSubmitBlk *pBlock) { + assert(pBlock->uid == builder->metadata->uid); + assert(pBlock->schemaLen == 0); + + char* pRow = pBlock->data; + char* pEnd = pBlock->data + htonl(pBlock->dataLen); + while (pRow < pEnd) { + if (!taosArrayPush(builder->rows, &pRow)) { + return false; + } + pRow += memRowTLen(pRow); } + + return true; } /** - * Merge the KV-PayLoad SQL objects into single one. (the statements here must be an insertion statement). + * A util function to sort SArray by key. + */ +static int32_t compareSMemRow(const void *x, const void *y) { + TSKEY left = memRowKey(*(void **) x); + TSKEY right = memRowKey(*(void **) y); + if (left == right) { + return 0; + } else { + return left > right ? 1 : -1; + } +} + +/** + * Build and write SSubmitBlk to `target` + * + * @param builder the SSubmitBlkBuilder. + * @param target the target to write. + * @return the writen bytes. + */ +static size_t writeSSubmitBlkBuilder(SSubmitBlkBuilder* builder, SSubmitBlk* target) { + memcpy(target, builder->metadata, sizeof(SSubmitBlk)); + + uint32_t dataLen = 0; + taosArraySort(builder->rows, compareSMemRow); + for (int i = 0; i < taosArrayGetSize(builder->rows); ++i) { + char* pRow = *(char**) (taosArrayGet(builder->rows, i)); + memcpy(POINTER_SHIFT(target->data, dataLen), pRow, memRowTLen(pRow)); + dataLen += memRowTLen(pRow); + } + + target->schemaLen = 0; + target->dataLen = htonl(dataLen); + target->numOfRows = htons(taosArrayGetSize(builder->rows)); + + return dataLen + sizeof(SSubmitBlk); +} + +/** + * Get the expected writen bytes of `writeSSubmitBlkBuilder`. + * + * @param builder the SSubmitBlkBuilder. + * @return the expected writen bytes of `writeSSubmitBlkBuilder`. + */ +static size_t writenSizeSSubmitBlkBuilder(SSubmitBlkBuilder* builder) { + size_t dataLen = 0; + for (int i = 0; i < taosArrayGetSize(builder->rows); ++i) { + char* pRow = *(char**) (taosArrayGet(builder->rows, i)); + dataLen += memRowTLen(pRow); + } + return dataLen + sizeof(SSubmitBlk); +} + +/** + * The builder to build SSubmitMsg::blocks. + */ +typedef struct SSubmitMsgBlocksBuilder { + // SHashObj. + SHashObj* blockBuilders; + int64_t vgId; +} SSubmitMsgBlocksBuilder; + +/** + * Create a SSubmitMsgBuilder. + * + * @param vgId the vgId of SSubmitMsg. + * @return the SSubmitMsgBuilder. + */ +static SSubmitMsgBlocksBuilder* createSSubmitMsgBuilder(int64_t vgId) { + SSubmitMsgBlocksBuilder* builder = calloc(1, sizeof(SSubmitMsgBlocksBuilder)); + if (!builder) { + return NULL; + } + builder->vgId = vgId; + + builder->blockBuilders = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); + if (!builder->blockBuilders) { + free(builder); + return NULL; + } + return builder; +} + +/** + * Get the expected writen bytes of `writeSSubmitMsgBlocksBuilder`. + * + * @param builder the SSubmitMsgBlocksBuilder. + * @return the expected writen bytes of `writeSSubmitMsgBlocksBuilder`. + */ +static size_t writenSizeSSubmitMsgBuilder(SSubmitMsgBlocksBuilder* builder) { + size_t allocSize = 0; + SSubmitBlkBuilder** iter = taosHashIterate(builder->blockBuilders, NULL); + while (iter) { + SSubmitBlkBuilder* blocksBuilder = *iter; + allocSize += writenSizeSSubmitBlkBuilder(blocksBuilder); + iter = taosHashIterate(builder->blockBuilders, iter); + } + return allocSize; +} + +/** + * Build and write SSubmitMsg::blocks to `pBlocks` + * + * @param builder the SSubmitBlkBuilder. + * @param pBlocks the target to write. + * @return the writen bytes. + */ +size_t writeSSubmitMsgBlocksBuilder(SSubmitMsgBlocksBuilder* builder, SSubmitBlk* pBlocks) { + size_t nWrite = 0; + SSubmitBlkBuilder** iter = taosHashIterate(builder->blockBuilders, NULL); + while (iter) { + SSubmitBlkBuilder* blocksBuilder = *iter; + SSubmitBlk* pBlock = POINTER_SHIFT(pBlocks, nWrite); + nWrite += writeSSubmitBlkBuilder(blocksBuilder, pBlock); + iter = taosHashIterate(builder->blockBuilders, iter); + } + return nWrite; +} + +/** + * Destroy the SSubmitMsgBlocksBuilder. + * + * @param builder the SSubmitMsgBlocksBuilder to destroy. + */ +static void destroySSubmitMsgBuilder(SSubmitMsgBlocksBuilder* builder) { + if (!builder) { + return; + } + + SSubmitBlkBuilder** iter = taosHashIterate(builder->blockBuilders, NULL); + while (iter) { + destroySSubmitBlkBuilder(*iter); + iter = taosHashIterate(builder->blockBuilders, iter); + } + taosHashCleanup(builder->blockBuilders); +} + +/** + * If the SSubmitBlkBuilder of pBlock->uid is present, returns it. Otherwise, build a new SSubmitBlkBuilder. + * + * @param builder the SSubmitMsgBlocksBuilder. + * @param pBlock the SSubmitBlk. + * @return the SSubmitBlkBuilder (NULL means failure). + */ +static SSubmitBlkBuilder* computeIfAbsentSSubmitBlkBuilder(SSubmitMsgBlocksBuilder* builder, SSubmitBlk* pBlock) { + SSubmitBlkBuilder** iter = taosHashGet(builder->blockBuilders, &pBlock->uid, sizeof(pBlock->uid)); + SSubmitBlkBuilder* blocksBuilder = NULL; + if (iter) { + return *iter; + } + + blocksBuilder = createSSubmitBlkBuilder(pBlock); + if (!blocksBuilder) { + return NULL; + } + + if (taosHashPut(builder->blockBuilders, &pBlock->uid, sizeof(pBlock->uid), &blocksBuilder, sizeof(SArray*))) { + destroySSubmitBlkBuilder(blocksBuilder); + return NULL; + } + + return blocksBuilder; +} +/** + * Append SSubmitMsg* to the SSubmitMsgBlocksBuilder. + * + * @param builder the SSubmitMsgBlocksBuilder. + * @param pMsg the SSubmitMsg* + * @param numOfBlocks the number of blocks in SSubmitMsg. + * @return whether the append is success. + */ +static bool appendSSubmitMsgBlocks(SSubmitMsgBlocksBuilder* builder, SSubmitBlk* pBlocks, size_t numOfBlocks) { + SSubmitBlk* pBlock = pBlocks; + for (size_t i = 0; i < numOfBlocks; ++i) { + assert(pBlock->schemaLen == 0); + SSubmitBlkBuilder* blocksBuilder = computeIfAbsentSSubmitBlkBuilder(builder, pBlock); + if (!blocksBuilder) { + return false; + } + + if (!appendSSubmitBlkBuilder(blocksBuilder, pBlock)) { + return false; + } + + size_t blockSize = sizeof (SSubmitBlk) + htonl(pBlock->dataLen); + pBlock = POINTER_SHIFT(pBlock, blockSize); + } + return true; +} + +/** + * Merge the KV-PayLoad SQL objects into single one. + * The statements here must be an insertion statement and no schema attached. * * @param statements the array of statements. a.k.a SArray. * @param result the returned result. result is not null! @@ -2229,7 +2518,7 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj **result) { } // a.k.a SHashObj, the key value represents vgroup id. - SHashObj* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); + SHashObj* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); if (!pVnodeDataBlockHashList) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } @@ -2242,6 +2531,7 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj **result) { // initialize the `pVnodeDataBlockHashList`. assert(pMergeInsertParam->payloadType == PAYLOAD_TYPE_KV); + assert(!pMergeInsertParam->schemaAttached); for (int i = 0; i < taosArrayGetSize(pMergeInsertParam->pDataBlocks); ++i) { STableDataBlocks *pDataBlocks = *((STableDataBlocks** )taosArrayGet(pMergeInsertParam->pDataBlocks, i)); if (taosHashPut(pVnodeDataBlockHashList, &pDataBlocks->vgId, sizeof(pDataBlocks->vgId), &pDataBlocks, sizeof(STableDataBlocks *))) { @@ -2257,17 +2547,12 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj **result) { SInsertStatementParam* pInsertParam = &pCmd->insertParam; assert(pInsertParam->payloadType == PAYLOAD_TYPE_KV); + assert(!pInsertParam->schemaAttached); // merge all the data blocks by vgroup id. for (int j = 0; pInsertParam->pDataBlocks && j < taosArrayGetSize(pInsertParam->pDataBlocks); ++j) { STableDataBlocks * tableBlock = *((STableDataBlocks **) taosArrayGet(pInsertParam->pDataBlocks, j)); - SSubmitBlk *pBlocks = (SSubmitBlk *)tableBlock->pData; - - // skip the empty data block. - if (pBlocks->numOfRows <= 0) { - tscDebug("0x%" PRIx64 " table %s data block is empty", pInsertParam->objectId, tableBlock->tableName.tname); - continue; - } + // SSubmitMsg *pBlocks = (SSubmitMsg *)tableBlock->pData; // get the data blocks of vgroup id. STableDataBlocks *dataBuf = NULL; @@ -2286,27 +2571,31 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj **result) { } else { dataBuf = *iter; } + + // header: SMsgDesc + SSubmitMsg(without SSubmitBlk[]) + assert(dataBuf->headerSize == (sizeof(SMsgDesc) + sizeof(SSubmitMsg))); + assert(dataBuf->headerSize == tableBlock->headerSize); + const size_t headerSize = tableBlock->headerSize; + const size_t destSize = dataBuf->size + (tableBlock->size - tableBlock->headerSize); - // the allocated size is too small. - int64_t destSize = dataBuf->size + (tableBlock->size - tableBlock->headerSize); - if (dataBuf->nAllocSize < destSize) { - dataBuf->nAllocSize = (uint32_t)(destSize * 1.5); - char *tmp = realloc(dataBuf->pData, dataBuf->nAllocSize); - if (tmp != NULL) { - dataBuf->pData = tmp; - } else { // failed to allocate memory, free already allocated memory and return error code - tscError("0x%" PRIx64 " failed to allocate memory for merging submit block, size:%d", pInsertParam->objectId, - dataBuf->nAllocSize); + if (!resizeDataBlocksData(dataBuf, debugFlag)) { + tscError("0x%" PRIx64 " failed to allocate memory for merging submit block, size:%d", pInsertParam->objectId, + dataBuf->nAllocSize); - taosHashCleanup(pVnodeDataBlockHashList); - tfree(dataBuf->pData); - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } + taosHashCleanup(pVnodeDataBlockHashList); + tfree(dataBuf->pData); + return TSDB_CODE_TSC_OUT_OF_MEMORY; } + + // pData = SMsgDesc + SSubmitMsg(with SSubmitBlk[]) + SSubmitBlk* target = (SSubmitBlk* ) (dataBuf->pData + headerSize); + SSubmitBlk* source = (SSubmitBlk* ) (tableBlock->pData + headerSize); - // copy the data into vgroup data blocks. - memcpy(dataBuf->pData + dataBuf->size, tableBlock->pData + tableBlock->headerSize, tableBlock->size - tableBlock->headerSize); - dataBuf->size += tableBlock->size - tableBlock->headerSize; + const size_t targetSize = dataBuf->size - headerSize; + const size_t sourceSize = tableBlock->size - headerSize; + + memcpy(POINTER_SHIFT(target, targetSize), source, sourceSize); + dataBuf->size = destSize; dataBuf->numOfTables += tableBlock->numOfTables; tscDestroyDataBlock(pSql, tableBlock, false); } @@ -2319,6 +2608,36 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj **result) { // clean up. taosHashCleanup(pVnodeDataBlockHashList); *result = merged; + + // rebuild SubmitMsg::blocks. + for (int i = 0; i < taosArrayGetSize(pMergeDataBlocks); ++i) { + STableDataBlocks* pDataBlocks = *((STableDataBlocks **) taosArrayGet(pMergeDataBlocks, i)); + SSubmitMsgBlocksBuilder *builder = createSSubmitMsgBuilder(pDataBlocks->vgId); + if (!builder) { + break ; + } + + SSubmitBlk* pBlock = (SSubmitBlk *) (pDataBlocks->pData + pDataBlocks->headerSize); + if (!appendSSubmitMsgBlocks(builder, pBlock, pDataBlocks->numOfTables)) { + destroySSubmitMsgBuilder(builder); + break; + } + + size_t nAllocSize = pDataBlocks->headerSize + writenSizeSSubmitMsgBuilder(builder); + char *pData = calloc(1, nAllocSize); + if (!pData) { + destroySSubmitMsgBuilder(builder); + break; + } + + pDataBlocks->nAllocSize = nAllocSize; + pDataBlocks->size = pDataBlocks->headerSize + writeSSubmitMsgBlocksBuilder(builder, (SSubmitBlk*)(pData + pDataBlocks->headerSize)); + memcpy(pData, pDataBlocks->pData, pDataBlocks->headerSize); + free(pDataBlocks->pData); + pDataBlocks->pData = pData; + + destroySSubmitMsgBuilder(builder); + } return TSDB_CODE_SUCCESS; } @@ -2326,7 +2645,11 @@ int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertPar const int INSERT_HEAD_SIZE = sizeof(SMsgDesc) + sizeof(SSubmitMsg); int code = 0; bool isRawPayload = IS_RAW_PAYLOAD(pInsertParam->payloadType); - void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); + size_t initialSize = taosHashGetSize(pInsertParam->pTableBlockHashList); + initialSize = initialSize > 128 ? 128 : initialSize; + + void* pVnodeDataBlockHashList = taosHashInit(initialSize, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); + SArray* pTableDataBlockList = taosArrayInit(taosHashGetSize(pInsertParam->pTableBlockHashList), POINTER_BYTES); SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES); STableDataBlocks** p = taosHashIterate(pInsertParam->pTableBlockHashList, NULL); @@ -2337,6 +2660,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertPar while(pOneTableBlock) { SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData; + taosArrayPush(pTableDataBlockList, &pOneTableBlock); if (pBlocks->numOfRows > 0) { // the maximum expanded size in byte when a row-wise data is converted to SDataRow format int32_t expandSize = isRawPayload ? getRowExpandSize(pOneTableBlock->pTableMeta) : 0; @@ -2347,6 +2671,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertPar if (ret != TSDB_CODE_SUCCESS) { tscError("0x%"PRIx64" failed to prepare the data block buffer for merging table data, code:%d", pInsertParam->objectId, ret); taosHashCleanup(pVnodeDataBlockHashList); + taosArrayDestroy(&pTableDataBlockList); tscDestroyBlockArrayList(pSql, pVnodeDataBlockList); tfree(blkKeyInfo.pKeyTuple); return ret; @@ -2431,15 +2756,22 @@ int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertPar pOneTableBlock = *p; } - extractTableNameList(pSql, pInsertParam); + extractTableNameList(pSql, pInsertParam, pTableDataBlockList); - if (freeBlockMap) { - pInsertParam->pTableBlockHashList = tscDestroyBlockHashTable(pSql, pInsertParam->pTableBlockHashList, false); + if (freeBlockMap && pInsertParam->pTableBlockHashList) { + taosHashCleanup(pInsertParam->pTableBlockHashList); + pInsertParam->pTableBlockHashList = NULL; + + for (int i = 0; i < taosArrayGetSize(pTableDataBlockList); ++i) { + STableDataBlocks* pDataBlocks = *((STableDataBlocks **) taosArrayGet(pTableDataBlockList, i)); + tscDestroyDataBlock(pSql, pDataBlocks, false); + } } // free the table data blocks; pInsertParam->pDataBlocks = pVnodeDataBlockList; taosHashCleanup(pVnodeDataBlockHashList); + taosArrayDestroy(&pTableDataBlockList); tfree(blkKeyInfo.pKeyTuple); return TSDB_CODE_SUCCESS;