提交 8832b4b1 编写于 作者: Z zhihaop

feat: optimize the performance of tscMergeTableDataBlocks

上级 207b1217
......@@ -2198,20 +2198,6 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) {
return result;
}
static void extractTableNameList(SInsertStatementParam *pInsertParam, SArray* pTableDataBlockList) {
pInsertParam->numOfTables = (int32_t) taosArrayGetSize(pTableDataBlockList);
if (!pInsertParam->pTableNameList) {
pInsertParam->pTableNameList = calloc(pInsertParam->numOfTables, sizeof(SName*));
}
for (int i = 0; i < taosArrayGetSize(pTableDataBlockList); ++i) {
STableDataBlocks* pBlocks = taosArrayGetP(pTableDataBlockList, i);
//tfree(pInsertParam->pTableNameList[i]);
pInsertParam->pTableNameList[i] = tNameDup(&pBlocks->tableName);
}
}
/**
* A builder of SSubmitBlk.
*/
......@@ -2922,18 +2908,25 @@ int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertPar
void* pVnodeDataBlockHashList = taosHashInit(initialSize, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);
// speed up hash iteration.
SArray* pTableDataBlockList = taosArrayInit(taosHashGetSize(pInsertParam->pTableBlockHashList), POINTER_BYTES);
// alloc table name list.
size_t numOfTables = taosHashGetSize(pInsertParam->pTableBlockHashList);
if (pInsertParam->pTableNameList) {
tfree(pInsertParam->pTableNameList);
}
pInsertParam->pTableNameList = calloc(numOfTables, sizeof(SName*));
pInsertParam->numOfTables = (int32_t) numOfTables;
STableDataBlocks** p = taosHashIterate(pInsertParam->pTableBlockHashList, NULL);
STableDataBlocks* pOneTableBlock = *p;
size_t tail = 0;
SBlockKeyInfo blkKeyInfo = {0}; // share by pOneTableBlock
while(pOneTableBlock) {
STableDataBlocks** iter = taosHashIterate(pInsertParam->pTableBlockHashList, NULL);
while (iter) {
STableDataBlocks* pOneTableBlock = *iter;
SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData;
taosArrayPush(pTableDataBlockList, &pOneTableBlock);
iter = taosHashIterate(pInsertParam->pTableBlockHashList, iter);
// extract table name list.
pInsertParam->pTableNameList[tail++] = tNameDup(&pOneTableBlock->tableName);
if (pBlocks->numOfRows > 0) {
// the maximum expanded size in byte when a row-wise data is converted to SDataRow format
int32_t expandSize = isRawPayload ? getRowExpandSize(pOneTableBlock->pTableMeta) : 0;
......@@ -2944,7 +2937,6 @@ int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertPar
if (ret != TSDB_CODE_SUCCESS) {
tscError("0x%"PRIx64" failed to prepare the data block buffer for merging table data, code:%d", pInsertParam->objectId, ret);
taosHashCleanup(pVnodeDataBlockHashList);
taosArrayDestroy(&pTableDataBlockList);
tscDestroyBlockArrayList(pSql, pVnodeDataBlockList);
tfree(blkKeyInfo.pKeyTuple);
return ret;
......@@ -3017,34 +3009,23 @@ int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertPar
dataBuf->numOfTables += 1;
pBlocks->numOfRows = 0;
}else {
} else {
tscDebug("0x%"PRIx64" table %s data block is empty", pInsertParam->objectId, pOneTableBlock->tableName.tname);
}
p = taosHashIterate(pInsertParam->pTableBlockHashList, p);
if (p == NULL) {
break;
if (freeBlockMap) {
tscDestroyDataBlock(pSql, pOneTableBlock, false);
}
pOneTableBlock = *p;
}
extractTableNameList(pInsertParam, pTableDataBlockList);
if (freeBlockMap && pInsertParam->pTableBlockHashList) {
if (freeBlockMap) {
taosHashCleanup(pInsertParam->pTableBlockHashList);
pInsertParam->pTableBlockHashList = NULL;
for (int i = 0; i < taosArrayGetSize(pTableDataBlockList); ++i) {
STableDataBlocks* pDataBlocks = *((STableDataBlocks **) taosArrayGet(pTableDataBlockList, i));
tscDestroyDataBlock(pSql, pDataBlocks, false);
}
}
// free the table data blocks;
pInsertParam->pDataBlocks = pVnodeDataBlockList;
taosHashCleanup(pVnodeDataBlockHashList);
taosArrayDestroy(&pTableDataBlockList);
tfree(blkKeyInfo.pKeyTuple);
return TSDB_CODE_SUCCESS;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册