diff --git a/src/client/src/tscBulkWrite.c b/src/client/src/tscBulkWrite.c index 509bedb0f4e0b52d2d3095222591a820b73069c4..b9993036599014af3a4dac63599dde3f265e7838 100644 --- a/src/client/src/tscBulkWrite.c +++ b/src/client/src/tscBulkWrite.c @@ -100,7 +100,7 @@ static void batchResultCallback(void* param, TAOS_RES* tres, int32_t code) { } taosReleaseRef(tscObjRef, res->self); - free(param); + free(context); } int32_t dispatcherStatementMerge(SArray* statements, SSqlObj** result) { @@ -124,30 +124,32 @@ int32_t dispatcherStatementMerge(SArray* statements, SSqlObj** result) { // initialize the callback context. context->count = count; for (size_t i = 0; i < count; ++i) { - SSqlObj* statement = *((SSqlObj**)taosArrayGet(statements, i)); + SSqlObj* statement = taosArrayGetP(statements, i); context->runnable[i].fp = statement->fp; context->runnable[i].param = statement->param; } // merge the statements into single one. tscDebug("start to merge %zu sql objs", count); - SSqlObj *pSql = *((SSqlObj**)taosArrayGet(statements, 0)); - SSqlObj *pNew = createSimpleSubObj(pSql, batchResultCallback, context, TSDB_SQL_INSERT); - if (!pNew) { - free(context); - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - - int32_t code = tscMergeKVPayLoadSqlObj(statements, pNew); + SSqlObj *pFirst = taosArrayGetP(statements, 0); + int32_t code = tscMergeKVPayLoadSqlObj(statements, pFirst); if (code != TSDB_CODE_SUCCESS) { const char* msg = tstrerror(code); tscDebug("failed to merge sql objects: %s", msg); free(context); - taosReleaseRef(tscObjRef, pNew->self); + taosReleaseRef(tscObjRef, pFirst->self); return code; } - *result = pNew; + pFirst->fp = batchResultCallback; + pFirst->param = context; + pFirst->fetchFp = pFirst->fp; + *result = pFirst; + + for (int i = 1; i < count; ++i) { + SSqlObj *pSql = taosArrayGetP(statements, i); + taosReleaseRef(tscObjRef, pSql->self); + } return code; } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index d18e48eff46d5896678e1d5f63c61d3a8384afb8..aa28d01e5a764f9665c97792afd31056dc4d5174 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -2198,14 +2198,14 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) { return result; } -static void extractTableNameList(SSqlObj *pSql, SInsertStatementParam *pInsertParam, SArray* pTableDataBlockList) { - pInsertParam->numOfTables = (int32_t) taosHashGetSize(pInsertParam->pTableBlockHashList); - if (pInsertParam->pTableNameList == NULL) { - pInsertParam->pTableNameList = malloc(pInsertParam->numOfTables * POINTER_BYTES); +static void extractTableNameList(SInsertStatementParam *pInsertParam, SArray* pTableDataBlockList) { + pInsertParam->numOfTables = (int32_t) taosArrayGetSize(pTableDataBlockList); + if (!pInsertParam->pTableNameList) { + pInsertParam->pTableNameList = calloc(pInsertParam->numOfTables, sizeof(SName*)); } for (int i = 0; i < taosArrayGetSize(pTableDataBlockList); ++i) { - STableDataBlocks* pBlocks = *((STableDataBlocks**) taosArrayGet(pTableDataBlockList, i)); + STableDataBlocks* pBlocks = taosArrayGetP(pTableDataBlockList, i); //tfree(pInsertParam->pTableNameList[i]); pInsertParam->pTableNameList[i] = tNameDup(&pBlocks->tableName); @@ -2406,6 +2406,7 @@ size_t writeSSubmitMsgBlocksBuilder(SSubmitMsgBlocksBuilder* builder, SSubmitBlk nWrite += writeSSubmitBlkBuilder(blocksBuilder, pBlock); iter = taosHashIterate(builder->blockBuilders, iter); } + return nWrite; } @@ -2664,7 +2665,7 @@ static bool appendSTableDataBlocksListBuilder(STableDataBlocksListBuilder* build * @param numOfTables the number of tables. * @return the vnode data blocks list. */ -static SArray* buildSTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder, int32_t* numOfTables) { +static SArray* buildSTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder, size_t* numOfTables) { SArray* pVnodeDataBlockList = taosArrayInit(taosHashGetSize(builder->dataBlocksBuilders), sizeof(STableDataBlocks*)); if (!pVnodeDataBlockList) { return NULL; @@ -2677,13 +2678,11 @@ static SArray* buildSTableDataBlocksListBuilder(STableDataBlocksListBuilder* bui if (!dataBlocks) { goto error; } - - numOfTables += dataBlocks->numOfTables; + *numOfTables += dataBlocks->numOfTables; taosArrayPush(pVnodeDataBlockList, &dataBlocks); iter = taosHashIterate(builder->dataBlocksBuilders, iter); } - return pVnodeDataBlockList; error: @@ -2695,6 +2694,119 @@ error: return NULL; } +/** + * A Builder to build SInsertStatementParam::pTableNameList. + */ +typedef struct STableNameListBuilder { + SArray* tableNames; +} STableNameListBuilder; + +/** + * Create STableNameListBuilder. + */ +STableNameListBuilder* createSTableNameListBuilder() { + STableNameListBuilder* builder = calloc(1, sizeof(STableNameListBuilder)); + if (!builder) { + return NULL; + } + + builder->tableNames = taosArrayInit(1, sizeof(SName*)); + if (!builder->tableNames) { + free(builder); + return NULL; + } + + return builder; +} + +/** + * Destroy the STableNameListBuilder. + * @param builder the STableNameListBuilder. + */ +void destroySTableNameListBuilder(STableNameListBuilder* builder) { + if (!builder) { + return; + } + taosArrayDestroy(&builder->tableNames); + free(builder); +} + +/** + * A util function to compare two SName. + */ +static int32_t compareSName(const void *x, const void *y) { + SName* left = *((SName **) x); + SName* right = *((SName **) y); + if (left == right) { + return 0; + } + return strncmp((const char*) left, (const char*) right, sizeof(SName)); +} + +/** + * Insert a SName to builder. + * + * @param builder the STableNameListBuilder. + * @param name the table name. + * @return whether it is success. + */ +bool insertSTableNameListBuilder(STableNameListBuilder* builder, SName* name) { + return taosArrayPush(builder->tableNames, &name); +} + +/** + * Build the STable name list. + * + * @param builder the STableNameListBuilder. + * @param numOfTables the number of table. + * @return the STable name list. + */ +SName** buildSTableNameListBuilder(STableNameListBuilder* builder, size_t* numOfTables) { + if (!taosArrayGetSize(builder->tableNames)) { + *numOfTables = 0; + return NULL; + } + + // sort and unique. + taosArraySort(builder->tableNames, compareSName); + size_t tail = 0; + for (size_t i = 1; i < taosArrayGetSize(builder->tableNames); ++i) { + SName* last = taosArrayGetP(builder->tableNames, tail); + SName* current = taosArrayGetP(builder->tableNames, i); + if (compareSName(last, current) != 0) { + ++tail; + taosArraySet(builder->tableNames, tail, ¤t); + } + } + + // build tableNameList + SName** tableNames = calloc(tail + 1, sizeof(SName*)); + if (!tableNames) { + return NULL; + } + + for (size_t i = 0; i <= tail; ++i) { + SName* clone = malloc(sizeof(SName)); + if (!clone) { + goto error; + } + memcpy(clone, taosArrayGetP(builder->tableNames, i), sizeof(SName)); + tableNames[i] = clone; + } + + *numOfTables = tail + 1; + return tableNames; + +error: + for (size_t i = 0; i <= tail; ++i) { + if (tableNames[i]) { + free(tableNames[i]); + } + } + free(tableNames); + return NULL; +} + /** * Merge the KV-PayLoad SQL objects into single one. * The statements here must be an insertion statement and no schema attached. @@ -2714,9 +2826,15 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj *result) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } + STableNameListBuilder* nameListBuilder = createSTableNameListBuilder(); + if (!nameListBuilder) { + destroySTableDataBlocksListBuilder(builder); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + // append the existing data blocks to builder. - for (int i = 0; i < taosArrayGetSize(statements); ++i) { - SSqlObj *pSql = *((SSqlObj**) taosArrayGet(statements, i)); + for (size_t i = 0; i < taosArrayGetSize(statements); ++i) { + SSqlObj *pSql = taosArrayGetP(statements, i); SInsertStatementParam* pInsertParam = &pSql->cmd.insertParam; if (!pInsertParam->pDataBlocks) { continue; @@ -2725,32 +2843,72 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj *result) { assert(pInsertParam->payloadType == PAYLOAD_TYPE_KV); assert(!pInsertParam->schemaAttached); - for (int j = 0; j < taosArrayGetSize(pInsertParam->pDataBlocks); ++j) { - STableDataBlocks * tableBlock = *((STableDataBlocks **) taosArrayGet(pInsertParam->pDataBlocks, j)); + // append each vnode data block to the builder. + for (size_t j = 0; j < taosArrayGetSize(pInsertParam->pDataBlocks); ++j) { + STableDataBlocks * tableBlock = taosArrayGetP(pInsertParam->pDataBlocks, j); if (!appendSTableDataBlocksListBuilder(builder, tableBlock)) { destroySTableDataBlocksListBuilder(builder); + destroySTableNameListBuilder(nameListBuilder); return TSDB_CODE_TSC_OUT_OF_MEMORY; } + + for (int k = 0; k < pInsertParam->numOfTables; ++k) { + if (!insertSTableNameListBuilder(nameListBuilder, pInsertParam->pTableNameList[k])) { + destroySTableDataBlocksListBuilder(builder); + destroySTableNameListBuilder(nameListBuilder); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + } } } // build the vnode data blocks. - int32_t numOfTables = 0; + size_t numOfBlocks = 0; SInsertStatementParam* pInsertParam = &result->cmd.insertParam; - SArray* pVnodeDataBlocksList = buildSTableDataBlocksListBuilder(builder, &numOfTables); + SArray* pVnodeDataBlocksList = buildSTableDataBlocksListBuilder(builder, &numOfBlocks); if (!pVnodeDataBlocksList) { destroySTableDataBlocksListBuilder(builder); + destroySTableNameListBuilder(nameListBuilder); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + // build the table name list. + size_t numOfTables = 0; + SName** pTableNameList = buildSTableNameListBuilder(nameListBuilder, &numOfTables); + if (!pTableNameList) { + destroySTableDataBlocksListBuilder(builder); + destroySTableNameListBuilder(nameListBuilder); return TSDB_CODE_TSC_OUT_OF_MEMORY; } + if (numOfTables != numOfBlocks) { + printf("numOfTables=%zu, numOfBlocks=%zu\n", numOfTables, numOfBlocks); + } + assert(numOfTables == numOfBlocks); + + // replace table name list. + if (pInsertParam->pTableNameList) { + for (size_t i = 0; i < pInsertParam->numOfTables; ++i) { + if (pInsertParam->pTableNameList[i]) { + free(pInsertParam->pTableNameList[i]); + } + } + free(pInsertParam->pTableNameList); + } + pInsertParam->pTableNameList = pTableNameList; + pInsertParam->numOfTables = (int32_t) numOfTables; + + // replace vnode data blocks. + if (pInsertParam->pDataBlocks) { + for (size_t i = 0; i < taosArrayGetSize(pInsertParam->pDataBlocks); ++i) { + tscDestroyDataBlock(result, taosArrayGetP(pInsertParam->pDataBlocks, i), false); + } + taosArrayDestroy(&pInsertParam->pDataBlocks); + } pInsertParam->pDataBlocks = pVnodeDataBlocksList; - pInsertParam->numOfTables = numOfTables; // clean up. - for (int i = 0; i < taosArrayGetSize(statements); ++i) { - SSqlObj *pSql = *((SSqlObj**) taosArrayGet(statements, i)); - taosReleaseRef(tscObjRef, pSql->self); - } destroySTableDataBlocksListBuilder(builder); + destroySTableNameListBuilder(nameListBuilder); return TSDB_CODE_SUCCESS; } @@ -2869,7 +3027,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertPar pOneTableBlock = *p; } - extractTableNameList(pSql, pInsertParam, pTableDataBlockList); + extractTableNameList(pInsertParam, pTableDataBlockList); if (freeBlockMap && pInsertParam->pTableBlockHashList) { taosHashCleanup(pInsertParam->pTableBlockHashList);