提交 8e0312e9 编写于 作者: Z zhihaop

enh: refactor tscMergeKVPayLoadSqlObj and improve performace of sqlobj merge

上级 bd874dd0
......@@ -149,7 +149,7 @@ void* tscDestroyUdfArrayList(SArray* pUdfList);
void* tscDestroyBlockHashTable(SSqlObj* pSql, SHashObj* pBlockHashTable, bool removeMeta);
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock);
int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj **result);
int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj *result);
int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertParam, bool freeBlockMap);
int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, SName* pName, STableMeta* pTableMeta,
STableDataBlocks** dataBlocks, SArray* pBlockList);
......
......@@ -131,17 +131,23 @@ int32_t dispatcherStatementMerge(SArray* statements, SSqlObj** result) {
// merge the statements into single one.
tscDebug("start to merge %zu sql objs", count);
int32_t code = tscMergeKVPayLoadSqlObj(statements, result);
SSqlObj *pSql = *((SSqlObj**)taosArrayGet(statements, 0));
SSqlObj *pNew = createSimpleSubObj(pSql, batchResultCallback, context, TSDB_SQL_INSERT);
if (!pNew) {
free(context);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
int32_t code = tscMergeKVPayLoadSqlObj(statements, pNew);
if (code != TSDB_CODE_SUCCESS) {
const char* msg = tstrerror(code);
tscDebug("failed to merge sql objects: %s", msg);
free(context);
} else {
// set the merged sql object callback.
(*result)->fp = batchResultCallback;
(*result)->fetchFp = (*result)->fp;
(*result)->param = context;
taosReleaseRef(tscObjRef, pNew->self);
return code;
}
*result = pNew;
return code;
}
......
......@@ -2212,29 +2212,6 @@ static void extractTableNameList(SSqlObj *pSql, SInsertStatementParam *pInsertPa
}
}
/**
* Resize the the data blocks data.
*
* @param dataBlocks the data blocks.
* @param destSize the destination size.
* @return whether is success.
*/
static bool resizeDataBlocksData(STableDataBlocks* dataBlocks, size_t destSize) {
if (dataBlocks->nAllocSize >= destSize) {
return true;
}
size_t nAllocSize = destSize + (destSize >> 1);
char *pData = realloc(dataBlocks->pData, nAllocSize);
if (!pData) {
return false;
}
dataBlocks->pData = pData;
dataBlocks->nAllocSize = nAllocSize;
return true;
}
/**
* A builder of SSubmitBlk.
*/
......@@ -2279,7 +2256,7 @@ SSubmitBlkBuilder* createSSubmitBlkBuilder(SSubmitBlk* metadata) {
/**
* Destroy the SSubmitBlkBuilder.
*
* @param builder
* @param builder the SSubmitBlkBuilder.
*/
void destroySSubmitBlkBuilder(SSubmitBlkBuilder* builder) {
if (!builder) {
......@@ -2432,6 +2409,15 @@ size_t writeSSubmitMsgBlocksBuilder(SSubmitMsgBlocksBuilder* builder, SSubmitBlk
return nWrite;
}
/**
* Get the number of block in SSubmitMsgBlocksBuilder.
* @param builder the SSubmitMsgBlocksBuilder.
* @return the number of SSubmitBlk block.
*/
size_t blockSizeSSubmitMsgBlocksBuilder(SSubmitMsgBlocksBuilder* builder) {
return taosHashGetSize(builder->blockBuilders);
}
/**
* Destroy the SSubmitMsgBlocksBuilder.
*
......@@ -2477,6 +2463,7 @@ static SSubmitBlkBuilder* computeIfAbsentSSubmitBlkBuilder(SSubmitMsgBlocksBuild
return blocksBuilder;
}
/**
* Append SSubmitMsg* to the SSubmitMsgBlocksBuilder.
*
......@@ -2504,6 +2491,210 @@ static bool appendSSubmitMsgBlocks(SSubmitMsgBlocksBuilder* builder, SSubmitBlk*
return true;
}
/**
* STableDataBlocksBuilder is a tool to build data blocks by append the existing data blocks in a vnode.
*/
typedef struct STableDataBlocksBuilder {
SSubmitMsgBlocksBuilder* blocksBuilder;
STableDataBlocks* firstBlock;
int64_t vgId;
} STableDataBlocksBuilder;
/**
* Create the STableDataBlocksBuilder.
*
* @param vgId the vgId of STableDataBlocksBuilder.
* @return the STableDataBlocksBuilder.
*/
static STableDataBlocksBuilder* createSTableDataBlocksBuilder(int64_t vgId) {
STableDataBlocksBuilder* builder = calloc(1, sizeof(STableDataBlocksBuilder));
if (!builder) {
return NULL;
}
builder->blocksBuilder = createSSubmitMsgBuilder(vgId);
if (!builder->blocksBuilder) {
free(builder);
return NULL;
}
builder->vgId = vgId;
builder->firstBlock = NULL;
return builder;
}
/**
* Destroy the STableDataBlocksBuilder.
* @param builder the STableDataBlocksBuilder.
*/
static void destroySTableDataBlocksBuilder(STableDataBlocksBuilder *builder) {
if (!builder) {
return;
}
destroySSubmitMsgBuilder(builder->blocksBuilder);
free(builder);
}
/**
* Append a data blocks to STableDataBlocksBuilder.
* @param builder the STableDataBlocksBuilder.
* @param dataBlocks the dataBlocks to append. the vgId of dataBlocks must be same with the STableDataBlocksBuilder.
* @return whether the append is success.
*/
static bool appendSTableDataBlocksBuilder(STableDataBlocksBuilder* builder, STableDataBlocks* dataBlocks) {
if (!dataBlocks || dataBlocks->vgId != builder->vgId) {
return false;
}
if (!builder->firstBlock) {
builder->firstBlock = dataBlocks;
}
SSubmitBlk* pBlocks = (SSubmitBlk *)(dataBlocks->pData + dataBlocks->headerSize);
return appendSSubmitMsgBlocks(builder->blocksBuilder, pBlocks, dataBlocks->numOfTables);
}
/**
* Build the data blocks for single vnode.
* @param builder the STableDataBlocksBuilder.
* @return the data blocks for single vnode.
*/
static STableDataBlocks* buildSTableDataBlocksBuilder(STableDataBlocksBuilder* builder) {
SSubmitMsgBlocksBuilder* blocksBuilder = builder->blocksBuilder;
STableDataBlocks *firstBlock = builder->firstBlock;
if (!firstBlock) {
return NULL;
}
size_t nWriteSize = writenSizeSSubmitMsgBuilder(builder->blocksBuilder);
size_t nHeaderSize = firstBlock->headerSize;
size_t nAllocSize = nWriteSize + nHeaderSize;
STableDataBlocks* dataBlocks = NULL;
int32_t code = tscCreateDataBlock(nAllocSize, 0, nHeaderSize, &firstBlock->tableName, firstBlock->pTableMeta, &dataBlocks);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
}
dataBlocks->size = nHeaderSize;
memcpy(dataBlocks->pData, firstBlock->pData, nHeaderSize);
dataBlocks->size += writeSSubmitMsgBlocksBuilder(blocksBuilder, (SSubmitBlk *) (dataBlocks->pData + nHeaderSize));
dataBlocks->numOfTables = blockSizeSSubmitMsgBlocksBuilder(blocksBuilder);
return dataBlocks;
}
/**
* STableDataBlocksListBuilder is a tool to build vnode data blocks list by appending exist data blocks.
*/
typedef struct STableDataBlocksListBuilder {
SHashObj* dataBlocksBuilders;
} STableDataBlocksListBuilder;
/**
* Create the STableDataBlocksListBuilder.
*
* @return the STableDataBlocksListBuilder.
*/
static STableDataBlocksListBuilder* createSTableDataBlocksListBuilder() {
STableDataBlocksListBuilder* builder = calloc(1, sizeof(STableDataBlocksListBuilder));
if (!builder) {
return NULL;
}
builder->dataBlocksBuilders = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
if (!builder->dataBlocksBuilders) {
free(builder);
return NULL;
}
return builder;
}
/**
* Destroy the STableDataBlocksListBuilder.
*
* @param builder the STableDataBlocksListBuilder.
*/
static void destroySTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder) {
if (!builder) {
return;
}
STableDataBlocksBuilder** iter = taosHashIterate(builder->dataBlocksBuilders, NULL);
while (iter) {
destroySTableDataBlocksBuilder(*iter);
iter = taosHashIterate(builder->dataBlocksBuilders, iter);
}
taosHashCleanup(builder->dataBlocksBuilders);
free(builder);
}
/**
* Append a data blocks to STableDataBlocksListBuilder.
*
* @param builder the STableDataBlocksListBuilder.
* @param dataBlocks the data blocks.
* @return whether the append is success.
*/
static bool appendSTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder, STableDataBlocks* dataBlocks) {
STableDataBlocksBuilder** item = taosHashGet(builder->dataBlocksBuilders, &dataBlocks->vgId, sizeof(dataBlocks->vgId));
STableDataBlocksBuilder* blocksBuilder = NULL;
if (item) {
blocksBuilder = *item;
} else {
blocksBuilder = createSTableDataBlocksBuilder(dataBlocks->vgId);
if (!blocksBuilder) {
return false;
}
if (taosHashPut(builder->dataBlocksBuilders, &dataBlocks->vgId, sizeof(dataBlocks->vgId), &blocksBuilder, sizeof(STableDataBlocksBuilder*))) {
destroySTableDataBlocksBuilder(blocksBuilder);
return false;
}
}
return appendSTableDataBlocksBuilder(blocksBuilder, dataBlocks);
}
/**
* Build the vnode data blocks list.
*
* @param builder the STableDataBlocksListBuilder.
* @param numOfTables the number of tables.
* @return the vnode data blocks list.
*/
static SArray* buildSTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder, int32_t* numOfTables) {
SArray* pVnodeDataBlockList = taosArrayInit(taosHashGetSize(builder->dataBlocksBuilders), sizeof(STableDataBlocks*));
if (!pVnodeDataBlockList) {
return NULL;
}
STableDataBlocksBuilder** iter = taosHashIterate(builder->dataBlocksBuilders, NULL);
while (iter) {
STableDataBlocksBuilder* dataBlocksBuilder = *iter;
STableDataBlocks* dataBlocks = buildSTableDataBlocksBuilder(dataBlocksBuilder);
if (!dataBlocks) {
goto error;
}
numOfTables += dataBlocks->numOfTables;
taosArrayPush(pVnodeDataBlockList, &dataBlocks);
iter = taosHashIterate(builder->dataBlocksBuilders, iter);
}
return pVnodeDataBlockList;
error:
for (int i = 0; i < taosArrayGetSize(pVnodeDataBlockList); ++i) {
STableDataBlocks* dataBlocks = *((STableDataBlocks**)taosArrayGet(pVnodeDataBlockList, i));
tscDestroyDataBlock(NULL, dataBlocks, false);
}
taosArrayDestroy(&pVnodeDataBlockList);
return NULL;
}
/**
* Merge the KV-PayLoad SQL objects into single one.
* The statements here must be an insertion statement and no schema attached.
......@@ -2512,129 +2703,54 @@ static bool appendSSubmitMsgBlocks(SSubmitMsgBlocksBuilder* builder, SSubmitBlk*
* @param result the returned result. result is not null!
* @return the status code. usually TSDB_CODE_SUCCESS.
*/
int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj **result) {
int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj *result) {
// statement array is empty.
if (!statements || !taosArrayGetSize(statements)) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
// a.k.a SHashObj<int64_t, STableDataBlocks*>, the key value represents vgroup id.
SHashObj* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
if (!pVnodeDataBlockHashList) {
STableDataBlocksListBuilder* builder = createSTableDataBlocksListBuilder();
if (!builder) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
// let the first statement in the array to be the merged result.
SSqlObj* merged = *((SSqlObj**) taosArrayGet(statements, 0));
SSqlCmd* pMergeCmd = &merged->cmd;
SInsertStatementParam* pMergeInsertParam = &pMergeCmd->insertParam;
SArray* pMergeDataBlocks = pMergeInsertParam->pDataBlocks;
// initialize the `pVnodeDataBlockHashList`.
assert(pMergeInsertParam->payloadType == PAYLOAD_TYPE_KV);
assert(!pMergeInsertParam->schemaAttached);
for (int i = 0; i < taosArrayGetSize(pMergeInsertParam->pDataBlocks); ++i) {
STableDataBlocks *pDataBlocks = *((STableDataBlocks** )taosArrayGet(pMergeInsertParam->pDataBlocks, i));
if (taosHashPut(pVnodeDataBlockHashList, &pDataBlocks->vgId, sizeof(pDataBlocks->vgId), &pDataBlocks, sizeof(STableDataBlocks *))) {
taosHashCleanup(pVnodeDataBlockHashList);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
}
// merge sql obj statements[i] into sql obj `merged`.
for (int i = 1; i < taosArrayGetSize(statements); ++i) {
// append the existing data blocks to builder.
for (int i = 0; i < taosArrayGetSize(statements); ++i) {
SSqlObj *pSql = *((SSqlObj**) taosArrayGet(statements, i));
SSqlCmd *pCmd = &pSql->cmd;
SInsertStatementParam* pInsertParam = &pCmd->insertParam;
SInsertStatementParam* pInsertParam = &pSql->cmd.insertParam;
if (!pInsertParam->pDataBlocks) {
continue;
}
assert(pInsertParam->payloadType == PAYLOAD_TYPE_KV);
assert(!pInsertParam->schemaAttached);
// merge all the data blocks by vgroup id.
for (int j = 0; pInsertParam->pDataBlocks && j < taosArrayGetSize(pInsertParam->pDataBlocks); ++j) {
for (int j = 0; j < taosArrayGetSize(pInsertParam->pDataBlocks); ++j) {
STableDataBlocks * tableBlock = *((STableDataBlocks **) taosArrayGet(pInsertParam->pDataBlocks, j));
// SSubmitMsg *pBlocks = (SSubmitMsg *)tableBlock->pData;
// get the data blocks of vgroup id.
STableDataBlocks *dataBuf = NULL;
STableDataBlocks** iter = taosHashGet(pVnodeDataBlockHashList, &tableBlock->vgId, sizeof(tableBlock->vgId));
if (iter == NULL) {
dataBuf = tableBlock;
if (taosHashPut(pVnodeDataBlockHashList, &tableBlock->vgId, sizeof(tableBlock->vgId), &dataBuf, sizeof(STableDataBlocks *))) {
taosHashCleanup(pVnodeDataBlockHashList);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
if (!taosArrayPush(pMergeDataBlocks, &dataBuf)) {
taosHashCleanup(pVnodeDataBlockHashList);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
} else {
dataBuf = *iter;
}
// header: SMsgDesc + SSubmitMsg(without SSubmitBlk[])
assert(dataBuf->headerSize == (sizeof(SMsgDesc) + sizeof(SSubmitMsg)));
assert(dataBuf->headerSize == tableBlock->headerSize);
const size_t destSize = dataBuf->size + (tableBlock->size - tableBlock->headerSize);
if (!resizeDataBlocksData(dataBuf, destSize)) {
tscError("0x%" PRIx64 " failed to allocate memory for merging submit block, size:%d", pInsertParam->objectId,
dataBuf->nAllocSize);
taosHashCleanup(pVnodeDataBlockHashList);
tfree(dataBuf->pData);
if (!appendSTableDataBlocksListBuilder(builder, tableBlock)) {
destroySTableDataBlocksListBuilder(builder);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
memcpy(dataBuf->pData + dataBuf->size, tableBlock->pData + tableBlock->headerSize, tableBlock->size - tableBlock->headerSize);
dataBuf->size = destSize;
dataBuf->numOfTables += tableBlock->numOfTables;
tscDestroyDataBlock(pSql, tableBlock, false);
}
// free the data blocks and sql objs. (because it is no longer needed).
taosArrayDestroy(&pInsertParam->pDataBlocks);
}
// build the vnode data blocks.
int32_t numOfTables = 0;
SInsertStatementParam* pInsertParam = &result->cmd.insertParam;
SArray* pVnodeDataBlocksList = buildSTableDataBlocksListBuilder(builder, &numOfTables);
if (!pVnodeDataBlocksList) {
destroySTableDataBlocksListBuilder(builder);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
pInsertParam->pDataBlocks = pVnodeDataBlocksList;
pInsertParam->numOfTables = numOfTables;
// clean up.
for (int i = 1; i < taosArrayGetSize(statements); ++i) {
for (int i = 0; i < taosArrayGetSize(statements); ++i) {
SSqlObj *pSql = *((SSqlObj**) taosArrayGet(statements, i));
taosReleaseRef(tscObjRef, pSql->self);
}
taosHashCleanup(pVnodeDataBlockHashList);
*result = merged;
// rebuild SubmitMsg::blocks.
for (int i = 0; i < taosArrayGetSize(pMergeDataBlocks); ++i) {
STableDataBlocks* pDataBlocks = *((STableDataBlocks **) taosArrayGet(pMergeDataBlocks, i));
SSubmitMsgBlocksBuilder *builder = createSSubmitMsgBuilder(pDataBlocks->vgId);
if (!builder) {
break ;
}
SSubmitBlk* pBlock = (SSubmitBlk *) (pDataBlocks->pData + pDataBlocks->headerSize);
if (!appendSSubmitMsgBlocks(builder, pBlock, pDataBlocks->numOfTables)) {
destroySSubmitMsgBuilder(builder);
break;
}
size_t nAllocSize = pDataBlocks->headerSize + writenSizeSSubmitMsgBuilder(builder);
char *pData = calloc(1, nAllocSize);
if (!pData) {
destroySSubmitMsgBuilder(builder);
break;
}
pDataBlocks->nAllocSize = nAllocSize;
pDataBlocks->size = pDataBlocks->headerSize + writeSSubmitMsgBlocksBuilder(builder, (SSubmitBlk*)(pData + pDataBlocks->headerSize));
memcpy(pData, pDataBlocks->pData, pDataBlocks->headerSize);
free(pDataBlocks->pData);
pDataBlocks->pData = pData;
destroySSubmitMsgBuilder(builder);
}
destroySTableDataBlocksListBuilder(builder);
return TSDB_CODE_SUCCESS;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册