提交 323cf595 编写于 作者: Z zhihaop

refactor: move tscMergeKVPayLoadSqlObj from tscUtil.c to tscDataBlockMerge.c

上级 d138f77a
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSCDATABLOCKMERGE_H
#define TDENGINE_TSCDATABLOCKMERGE_H
#include "hash.h"
#include "taosmsg.h"
#include "tarray.h"
#include "tscUtil.h"
#include "tsclient.h"
#ifdef __cplusplus
extern "C" {
#endif
/**
* A builder of SSubmitBlk.
*/
typedef struct SSubmitBlkBuilder {
// the metadata of the SSubmitBlk.
SSubmitBlk* metadata;
// the array stores all the rows in a table, aka SArray<SMemRow>.
SArray* rows;
} SSubmitBlkBuilder;
/**
* The builder to build SSubmitMsg::blocks.
*/
typedef struct SSubmitMsgBlocksBuilder {
// SHashObj<table_uid, SSubmitBlkBuilder*>.
SHashObj* blockBuilders;
int64_t vgId;
} SSubmitMsgBlocksBuilder;
/**
* STableDataBlocksBuilder is a tool to build data blocks by append the existing data blocks in a vnode.
*/
typedef struct STableDataBlocksBuilder {
SSubmitMsgBlocksBuilder* blocksBuilder;
STableDataBlocks* firstBlock;
int64_t vgId;
} STableDataBlocksBuilder;
/**
* STableDataBlocksListBuilder is a tool to build vnode data blocks list by appending exist data blocks.
*/
typedef struct STableDataBlocksListBuilder {
SHashObj* dataBlocksBuilders;
} STableDataBlocksListBuilder;
/**
* A Builder to build SInsertStatementParam::pTableNameList.
*/
typedef struct STableNameListBuilder {
// store the unsorted table names, SArray<SName*>.
SArray* tableNames;
} STableNameListBuilder;
/**
* Create a SSubmitBlkBuilder using exist metadata.
*
* @param metadata the metadata.
* @return the SSubmitBlkBuilder.
*/
SSubmitBlkBuilder* createSSubmitBlkBuilder(SSubmitBlk* metadata);
/**
* Destroy the SSubmitBlkBuilder.
*
* @param builder the SSubmitBlkBuilder.
*/
void destroySSubmitBlkBuilder(SSubmitBlkBuilder* builder);
/**
* Append a SSubmitBlk* to the builder. The table uid in pBlock must be the same with the builder's.
*
* @param builder the SSubmitBlkBuilder.
* @param pBlock the pBlock to append.
* @return whether the append is success.
*/
bool appendSSubmitBlkBuilder(SSubmitBlkBuilder* builder, SSubmitBlk *pBlock);
/**
* Build and write SSubmitBlk to `target`
*
* @param builder the SSubmitBlkBuilder.
* @param target the target to write.
* @param nRows the number of rows in SSubmitBlk*.
* @return the writen bytes.
*/
size_t writeSSubmitBlkBuilder(SSubmitBlkBuilder* builder, SSubmitBlk* target, size_t* nRows);
/**
* Get the expected writen bytes of `writeSSubmitBlkBuilder`.
*
* @param builder the SSubmitBlkBuilder.
* @return the expected writen bytes of `writeSSubmitBlkBuilder`.
*/
size_t nWriteSSubmitBlkBuilder(SSubmitBlkBuilder* builder);
/**
* Create a SSubmitMsgBuilder.
*
* @param vgId the vgId of SSubmitMsg.
* @return the SSubmitMsgBuilder.
*/
SSubmitMsgBlocksBuilder* createSSubmitMsgBuilder(int64_t vgId);
/**
* Get the expected writen bytes of `writeSSubmitMsgBlocksBuilder`.
*
* @param builder the SSubmitMsgBlocksBuilder.
* @return the expected writen bytes of `writeSSubmitMsgBlocksBuilder`.
*/
size_t nWriteSSubmitMsgBuilder(SSubmitMsgBlocksBuilder* builder);
/**
* Build and write SSubmitMsg::blocks to `pBlocks`
*
* @param builder the SSubmitBlkBuilder.
* @param pBlocks the target to write.
* @param nRows the number of row in SSubmitMsg::blocks.
* @return the writen bytes.
*/
size_t writeSSubmitMsgBlocksBuilder(SSubmitMsgBlocksBuilder* builder, SSubmitBlk* pBlocks, size_t* nRows);
/**
* Get the number of block in SSubmitMsgBlocksBuilder.
* @param builder the SSubmitMsgBlocksBuilder.
* @return the number of SSubmitBlk block.
*/
size_t nBlockSSubmitMsgBlocksBuilder(SSubmitMsgBlocksBuilder* builder);
/**
* Destroy the SSubmitMsgBlocksBuilder.
*
* @param builder the SSubmitMsgBlocksBuilder to destroy.
*/
void destroySSubmitMsgBuilder(SSubmitMsgBlocksBuilder* builder);
/**
* Append SSubmitMsg* to the SSubmitMsgBlocksBuilder.
*
* @param builder the SSubmitMsgBlocksBuilder.
* @param pBlocks the SSubmitBlk in SSubmitMsg::blocks.
* @param nBlocks the number of blocks in SSubmitMsg.
* @return whether the append is success.
*/
bool appendSSubmitMsgBlocks(SSubmitMsgBlocksBuilder* builder, SSubmitBlk* pBlocks, size_t nBlocks);
/**
* Create the STableDataBlocksBuilder.
*
* @param vgId the vgId of STableDataBlocksBuilder.
* @return the STableDataBlocksBuilder.
*/
STableDataBlocksBuilder* createSTableDataBlocksBuilder(int64_t vgId);
/**
* Destroy the STableDataBlocksBuilder.
* @param builder the STableDataBlocksBuilder.
*/
void destroySTableDataBlocksBuilder(STableDataBlocksBuilder *builder);
/**
* Append a data blocks to STableDataBlocksBuilder.
* @param builder the STableDataBlocksBuilder.
* @param dataBlocks the dataBlocks to append. the vgId of dataBlocks must be same with the STableDataBlocksBuilder.
* @return whether the append is success.
*/
bool appendSTableDataBlocksBuilder(STableDataBlocksBuilder* builder, STableDataBlocks* dataBlocks);
/**
* Build the data blocks for single vnode.
* @param builder the STableDataBlocksBuilder.
* @param nRows the number of row in STableDataBlocks.
* @return the data blocks for single vnode.
*/
STableDataBlocks* buildSTableDataBlocksBuilder(STableDataBlocksBuilder* builder, size_t* nRows);
/**
* Create the STableDataBlocksListBuilder.
*
* @return the STableDataBlocksListBuilder.
*/
STableDataBlocksListBuilder* createSTableDataBlocksListBuilder();
/**
* Destroy the STableDataBlocksListBuilder.
*
* @param builder the STableDataBlocksListBuilder.
*/
void destroySTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder);
/**
* Append a data blocks to STableDataBlocksListBuilder.
*
* @param builder the STableDataBlocksListBuilder.
* @param dataBlocks the data blocks.
* @return whether the append is success.
*/
bool appendSTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder, STableDataBlocks* dataBlocks);
/**
* Build the vnode data blocks list.
*
* @param builder the STableDataBlocksListBuilder.
* @param nTables the number of table in vnode data blocks list.
* @param nRows the number of row in vnode data blocks list.
* @return the vnode data blocks list.
*/
SArray* buildSTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder, size_t* nTables, size_t* nRows);
/**
* Create STableNameListBuilder.
*/
STableNameListBuilder* createSTableNameListBuilder();
/**
* Destroy the STableNameListBuilder.
* @param builder the STableNameListBuilder.
*/
void destroySTableNameListBuilder(STableNameListBuilder* builder);
/**
* Insert a SName to builder.
*
* @param builder the STableNameListBuilder.
* @param name the table name.
* @return whether it is success.
*/
bool insertSTableNameListBuilder(STableNameListBuilder* builder, SName* name);
/**
* Build the STable name list.
*
* @param builder the STableNameListBuilder.
* @param numOfTables the number of table.
* @return the STable name list.
*/
SName** buildSTableNameListBuilder(STableNameListBuilder* builder, size_t* numOfTables);
/**
* Merge the KV-PayLoad SQL objects into single one.
* The statements here must be an insertion statement and no schema attached.
*
* @param statements the array of statements. a.k.a SArray<SSqlObj*>.
* @param result the returned result. result is not null!
* @return the status code. usually TSDB_CODE_SUCCESS.
*/
int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj *result);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TSCDATABLOCKMERGE_H
......@@ -149,7 +149,6 @@ void* tscDestroyUdfArrayList(SArray* pUdfList);
void* tscDestroyBlockHashTable(SSqlObj* pSql, SHashObj* pBlockHashTable, bool removeMeta);
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock);
int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj *result);
int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertParam, bool freeBlockMap);
int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, SName* pName, STableMeta* pTableMeta,
STableDataBlocks** dataBlocks, SArray* pBlockList);
......
......@@ -15,6 +15,7 @@
#include "osAtomic.h"
#include "tscDataBlockMerge.h"
#include "tscBulkWrite.h"
#include "tscLog.h"
#include "tscSubquery.h"
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tscDataBlockMerge.h"
/**
* A util function to compare two SName.
*/
static int32_t compareSName(const void *x, const void *y) {
SName* left = *((SName **) x);
SName* right = *((SName **) y);
if (left == right) {
return 0;
}
return strncmp((const char*) left, (const char*) right, sizeof(SName));
}
/**
* A util function to sort SArray<SMemRow> by key.
*/
static int32_t compareSMemRow(const void *x, const void *y) {
TSKEY left = memRowKey(*(void **) x);
TSKEY right = memRowKey(*(void **) y);
if (left == right) {
return 0;
} else {
return left > right ? 1 : -1;
}
}
/**
* If the SSubmitBlkBuilder of pBlock->uid is present, returns it. Otherwise, build a new SSubmitBlkBuilder.
*
* @param builder the SSubmitMsgBlocksBuilder.
* @param pBlock the SSubmitBlk.
* @return the SSubmitBlkBuilder (NULL means failure).
*/
inline static SSubmitBlkBuilder* computeIfAbsentSSubmitBlkBuilder(SSubmitMsgBlocksBuilder* builder, SSubmitBlk* pBlock) {
SSubmitBlkBuilder** iter = taosHashGet(builder->blockBuilders, &pBlock->uid, sizeof(pBlock->uid));
SSubmitBlkBuilder* blocksBuilder = NULL;
if (iter) {
return *iter;
}
blocksBuilder = createSSubmitBlkBuilder(pBlock);
if (!blocksBuilder) {
return NULL;
}
if (taosHashPut(builder->blockBuilders, &pBlock->uid, sizeof(pBlock->uid), &blocksBuilder, sizeof(SArray*))) {
destroySSubmitBlkBuilder(blocksBuilder);
return NULL;
}
return blocksBuilder;
}
SName** buildSTableNameListBuilder(STableNameListBuilder* builder, size_t* numOfTables) {
if (!taosArrayGetSize(builder->tableNames)) {
*numOfTables = 0;
return NULL;
}
// sort and unique.
taosArraySort(builder->tableNames, compareSName);
size_t tail = 0;
for (size_t i = 1; i < taosArrayGetSize(builder->tableNames); ++i) {
SName* last = taosArrayGetP(builder->tableNames, tail);
SName* current = taosArrayGetP(builder->tableNames, i);
if (compareSName(last, current) != 0) {
++tail;
taosArraySet(builder->tableNames, tail, &current);
}
}
// build table names list.
SName** tableNames = calloc(tail + 1, sizeof(SName*));
if (!tableNames) {
return NULL;
}
// clone data.
for (size_t i = 0; i <= tail; ++i) {
SName* clone = malloc(sizeof(SName));
if (!clone) {
goto error;
}
memcpy(clone, taosArrayGetP(builder->tableNames, i), sizeof(SName));
tableNames[i] = clone;
}
*numOfTables = tail + 1;
return tableNames;
error:
for (size_t i = 0; i <= tail; ++i) {
if (tableNames[i]) {
free(tableNames[i]);
}
}
free(tableNames);
return NULL;
}
SSubmitBlkBuilder* createSSubmitBlkBuilder(SSubmitBlk* metadata) {
SSubmitBlkBuilder* builder = calloc(1, sizeof(SSubmitBlkBuilder));
if (!builder) {
return NULL;
}
builder->rows = taosArrayInit(1, sizeof(SMemRow));
if (!builder->rows) {
free(builder);
return NULL;
}
builder->metadata = calloc(1, sizeof(SSubmitBlk));
if (!builder->metadata) {
taosArrayDestroy(&builder->rows);
free(builder);
return NULL;
}
memcpy(builder->metadata, metadata, sizeof(SSubmitBlk));
return builder;
}
void destroySSubmitBlkBuilder(SSubmitBlkBuilder* builder) {
if (!builder) {
return;
}
taosArrayDestroy(&builder->rows);
free(builder->metadata);
free(builder);
}
bool appendSSubmitBlkBuilder(SSubmitBlkBuilder* builder, SSubmitBlk* pBlock) {
assert(pBlock->uid == builder->metadata->uid);
assert(pBlock->schemaLen == 0);
// shadow copy all the SMemRow to SSubmitBlkBuilder::rows.
char* pRow = pBlock->data;
char* pEnd = pBlock->data + htonl(pBlock->dataLen);
while (pRow < pEnd) {
if (!taosArrayPush(builder->rows, &pRow)) {
return false;
}
pRow += memRowTLen(pRow);
}
return true;
}
size_t writeSSubmitBlkBuilder(SSubmitBlkBuilder* builder, SSubmitBlk* target, size_t* nRows) {
memcpy(target, builder->metadata, sizeof(SSubmitBlk));
// sort SSubmitBlkBuilder::rows by timestamp.
uint32_t dataLen = 0;
taosArraySort(builder->rows, compareSMemRow);
// deep copy all the SMemRow to target.
for (int i = 0; i < taosArrayGetSize(builder->rows); ++i) {
char* pRow = taosArrayGetP(builder->rows, i);
memcpy(POINTER_SHIFT(target->data, dataLen), pRow, memRowTLen(pRow));
dataLen += memRowTLen(pRow);
}
*nRows = taosArrayGetSize(builder->rows);
target->schemaLen = 0;
target->dataLen = (int32_t) htonl(dataLen);
target->numOfRows = (int16_t) htons(*nRows);
return dataLen + sizeof(SSubmitBlk);
}
size_t nWriteSSubmitBlkBuilder(SSubmitBlkBuilder* builder) {
size_t dataLen = 0;
for (int i = 0; i < taosArrayGetSize(builder->rows); ++i) {
char* pRow = taosArrayGetP(builder->rows, i);
dataLen += memRowTLen(pRow);
}
return dataLen + sizeof(SSubmitBlk);
}
SSubmitMsgBlocksBuilder* createSSubmitMsgBuilder(int64_t vgId) {
SSubmitMsgBlocksBuilder* builder = calloc(1, sizeof(SSubmitMsgBlocksBuilder));
if (!builder) {
return NULL;
}
builder->vgId = vgId;
builder->blockBuilders = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
if (!builder->blockBuilders) {
free(builder);
return NULL;
}
return builder;
}
size_t nWriteSSubmitMsgBuilder(SSubmitMsgBlocksBuilder* builder) {
size_t nWrite = 0;
SSubmitBlkBuilder** iter = taosHashIterate(builder->blockBuilders, NULL);
while (iter) {
SSubmitBlkBuilder* blocksBuilder = *iter;
nWrite += nWriteSSubmitBlkBuilder(blocksBuilder);
iter = taosHashIterate(builder->blockBuilders, iter);
}
return nWrite;
}
size_t writeSSubmitMsgBlocksBuilder(SSubmitMsgBlocksBuilder* builder, SSubmitBlk* pBlocks, size_t* nRows) {
size_t nWrite = 0;
SSubmitBlkBuilder** iter = taosHashIterate(builder->blockBuilders, NULL);
// copy all the SSubmitBlk to pBlocks.
while (iter) {
size_t nSubRows = 0;
SSubmitBlkBuilder* blocksBuilder = *iter;
SSubmitBlk* pBlock = POINTER_SHIFT(pBlocks, nWrite);
nWrite += writeSSubmitBlkBuilder(blocksBuilder, pBlock, &nSubRows);
*nRows += nSubRows;
iter = taosHashIterate(builder->blockBuilders, iter);
}
return nWrite;
}
size_t nBlockSSubmitMsgBlocksBuilder(SSubmitMsgBlocksBuilder* builder) {
return taosHashGetSize(builder->blockBuilders);
}
void destroySSubmitMsgBuilder(SSubmitMsgBlocksBuilder* builder) {
if (!builder) {
return;
}
SSubmitBlkBuilder** iter = taosHashIterate(builder->blockBuilders, NULL);
while (iter) {
destroySSubmitBlkBuilder(*iter);
iter = taosHashIterate(builder->blockBuilders, iter);
}
taosHashCleanup(builder->blockBuilders);
free(builder);
}
bool appendSSubmitMsgBlocks(SSubmitMsgBlocksBuilder* builder, SSubmitBlk* pBlocks, size_t nBlocks) {
SSubmitBlk* pBlock = pBlocks;
for (size_t i = 0; i < nBlocks; ++i) {
// not support SSubmitBlk with schema.
assert(pBlock->schemaLen == 0);
// get the builder of specific table (by pBlock->uid).
SSubmitBlkBuilder* blocksBuilder = computeIfAbsentSSubmitBlkBuilder(builder, pBlock);
if (!blocksBuilder) {
return false;
}
if (!appendSSubmitBlkBuilder(blocksBuilder, pBlock)) {
return false;
}
// go to next block.
size_t blockSize = sizeof (SSubmitBlk) + htonl(pBlock->dataLen);
pBlock = POINTER_SHIFT(pBlock, blockSize);
}
return true;
}
STableDataBlocksBuilder* createSTableDataBlocksBuilder(int64_t vgId) {
STableDataBlocksBuilder* builder = calloc(1, sizeof(STableDataBlocksBuilder));
if (!builder) {
return NULL;
}
builder->blocksBuilder = createSSubmitMsgBuilder(vgId);
if (!builder->blocksBuilder) {
free(builder);
return NULL;
}
builder->vgId = vgId;
builder->firstBlock = NULL;
return builder;
}
void destroySTableDataBlocksBuilder(STableDataBlocksBuilder* builder) {
if (!builder) {
return;
}
destroySSubmitMsgBuilder(builder->blocksBuilder);
free(builder);
}
bool appendSTableDataBlocksBuilder(STableDataBlocksBuilder* builder, STableDataBlocks* dataBlocks) {
// the data blocks vgId must be same with builder vgId.
if (!dataBlocks || dataBlocks->vgId != builder->vgId) {
return false;
}
if (!builder->firstBlock) {
builder->firstBlock = dataBlocks;
}
SSubmitBlk* pBlocks = (SSubmitBlk *)(dataBlocks->pData + dataBlocks->headerSize);
return appendSSubmitMsgBlocks(builder->blocksBuilder, pBlocks, dataBlocks->numOfTables);
}
STableDataBlocks* buildSTableDataBlocksBuilder(STableDataBlocksBuilder* builder, size_t* nRows) {
SSubmitMsgBlocksBuilder* blocksBuilder = builder->blocksBuilder;
STableDataBlocks *firstBlock = builder->firstBlock;
if (!firstBlock) {
return NULL;
}
size_t nWriteSize = nWriteSSubmitMsgBuilder(builder->blocksBuilder);
size_t nHeaderSize = firstBlock->headerSize;
size_t nAllocSize = nWriteSize + nHeaderSize;
// allocate data blocks.
STableDataBlocks* dataBlocks = NULL;
int32_t code = tscCreateDataBlock(nAllocSize, 0, (int32_t) nHeaderSize, &firstBlock->tableName, firstBlock->pTableMeta, &dataBlocks);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
}
// build the header (using first block).
dataBlocks->size = nHeaderSize;
memcpy(dataBlocks->pData, firstBlock->pData, nHeaderSize);
// build the SSubmitMsg::blocks.
dataBlocks->size += writeSSubmitMsgBlocksBuilder(blocksBuilder, (SSubmitBlk *) (dataBlocks->pData + nHeaderSize), nRows);
dataBlocks->numOfTables = (int32_t) nBlockSSubmitMsgBlocksBuilder(blocksBuilder);
return dataBlocks;
}
STableDataBlocksListBuilder* createSTableDataBlocksListBuilder() {
STableDataBlocksListBuilder* builder = calloc(1, sizeof(STableDataBlocksListBuilder));
if (!builder) {
return NULL;
}
builder->dataBlocksBuilders = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
if (!builder->dataBlocksBuilders) {
free(builder);
return NULL;
}
return builder;
}
void destroySTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder) {
if (!builder) {
return;
}
STableDataBlocksBuilder** iter = taosHashIterate(builder->dataBlocksBuilders, NULL);
while (iter) {
destroySTableDataBlocksBuilder(*iter);
iter = taosHashIterate(builder->dataBlocksBuilders, iter);
}
taosHashCleanup(builder->dataBlocksBuilders);
free(builder);
}
bool appendSTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder, STableDataBlocks* dataBlocks) {
// get the data blocks builder of specific vgId.
STableDataBlocksBuilder** item = taosHashGet(builder->dataBlocksBuilders, &dataBlocks->vgId, sizeof(dataBlocks->vgId));
STableDataBlocksBuilder* blocksBuilder = NULL;
if (item) {
blocksBuilder = *item;
} else {
blocksBuilder = createSTableDataBlocksBuilder(dataBlocks->vgId);
if (!blocksBuilder) {
return false;
}
if (taosHashPut(builder->dataBlocksBuilders, &dataBlocks->vgId, sizeof(dataBlocks->vgId), &blocksBuilder, sizeof(STableDataBlocksBuilder*))) {
destroySTableDataBlocksBuilder(blocksBuilder);
return false;
}
}
// append to this builder.
return appendSTableDataBlocksBuilder(blocksBuilder, dataBlocks);
}
SArray* buildSTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder, size_t* nTables, size_t* nRows) {
SArray* pVnodeDataBlockList = taosArrayInit(taosHashGetSize(builder->dataBlocksBuilders), sizeof(STableDataBlocks*));
if (!pVnodeDataBlockList) {
return NULL;
}
// build data blocks of each vgId.
STableDataBlocksBuilder** iter = taosHashIterate(builder->dataBlocksBuilders, NULL);
while (iter) {
size_t nSubRows = 0;
STableDataBlocksBuilder* dataBlocksBuilder = *iter;
STableDataBlocks* dataBlocks = buildSTableDataBlocksBuilder(dataBlocksBuilder, &nSubRows);
if (!dataBlocks) {
goto error;
}
*nTables += dataBlocks->numOfTables;
*nRows += nSubRows;
taosArrayPush(pVnodeDataBlockList, &dataBlocks);
iter = taosHashIterate(builder->dataBlocksBuilders, iter);
}
return pVnodeDataBlockList;
error:
for (int i = 0; i < taosArrayGetSize(pVnodeDataBlockList); ++i) {
STableDataBlocks* dataBlocks = taosArrayGetP(pVnodeDataBlockList, i);
tscDestroyDataBlock(NULL, dataBlocks, false);
}
taosArrayDestroy(&pVnodeDataBlockList);
return NULL;
}
STableNameListBuilder* createSTableNameListBuilder() {
STableNameListBuilder* builder = calloc(1, sizeof(STableNameListBuilder));
if (!builder) {
return NULL;
}
builder->tableNames = taosArrayInit(1, sizeof(SName*));
if (!builder->tableNames) {
free(builder);
return NULL;
}
return builder;
}
void destroySTableNameListBuilder(STableNameListBuilder* builder) {
if (!builder) {
return;
}
taosArrayDestroy(&builder->tableNames);
free(builder);
}
bool insertSTableNameListBuilder(STableNameListBuilder* builder, SName* name) {
return taosArrayPush(builder->tableNames, &name);
}
int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj* result) {
// statement array is empty.
if (!statements || !taosArrayGetSize(statements)) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
STableDataBlocksListBuilder* builder = createSTableDataBlocksListBuilder();
if (!builder) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
STableNameListBuilder* nameListBuilder = createSTableNameListBuilder();
if (!nameListBuilder) {
destroySTableDataBlocksListBuilder(builder);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
// append the existing data blocks to builder.
for (size_t i = 0; i < taosArrayGetSize(statements); ++i) {
SSqlObj *pSql = taosArrayGetP(statements, i);
SInsertStatementParam* pInsertParam = &pSql->cmd.insertParam;
if (!pInsertParam->pDataBlocks) {
continue;
}
assert(pInsertParam->payloadType == PAYLOAD_TYPE_KV);
assert(!pInsertParam->schemaAttached);
// append each vnode data block to the builder.
for (size_t j = 0; j < taosArrayGetSize(pInsertParam->pDataBlocks); ++j) {
STableDataBlocks * tableBlock = taosArrayGetP(pInsertParam->pDataBlocks, j);
if (!appendSTableDataBlocksListBuilder(builder, tableBlock)) {
destroySTableDataBlocksListBuilder(builder);
destroySTableNameListBuilder(nameListBuilder);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
for (int k = 0; k < pInsertParam->numOfTables; ++k) {
if (!insertSTableNameListBuilder(nameListBuilder, pInsertParam->pTableNameList[k])) {
destroySTableDataBlocksListBuilder(builder);
destroySTableNameListBuilder(nameListBuilder);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
}
}
}
// build the vnode data blocks.
size_t nBlocks = 0;
size_t nRows = 0;
SInsertStatementParam* pInsertParam = &result->cmd.insertParam;
SArray* pVnodeDataBlocksList = buildSTableDataBlocksListBuilder(builder, &nBlocks, &nRows);
if (!pVnodeDataBlocksList) {
destroySTableDataBlocksListBuilder(builder);
destroySTableNameListBuilder(nameListBuilder);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
// build the table name list.
size_t nTables = 0;
SName** pTableNameList = buildSTableNameListBuilder(nameListBuilder, &nTables);
if (!pTableNameList) {
destroySTableDataBlocksListBuilder(builder);
destroySTableNameListBuilder(nameListBuilder);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
assert(nTables == nBlocks);
// replace table name list.
if (pInsertParam->pTableNameList) {
destroyTableNameList(pInsertParam);
}
pInsertParam->pTableNameList = pTableNameList;
pInsertParam->numOfTables = (int32_t) nTables;
// replace vnode data blocks.
if (pInsertParam->pDataBlocks) {
tscDestroyBlockArrayList(result, pInsertParam->pDataBlocks);
}
pInsertParam->pDataBlocks = pVnodeDataBlocksList;
pInsertParam->numOfRows = (int32_t) nRows;
// clean up.
destroySTableDataBlocksListBuilder(builder);
destroySTableNameListBuilder(nameListBuilder);
return TSDB_CODE_SUCCESS;
}
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册