提交 c88b3436 编写于 作者: Z zhihaop

fix: the data in pInsertParam->pTableNameList is not corrent

上级 8e0312e9
...@@ -100,7 +100,7 @@ static void batchResultCallback(void* param, TAOS_RES* tres, int32_t code) { ...@@ -100,7 +100,7 @@ static void batchResultCallback(void* param, TAOS_RES* tres, int32_t code) {
} }
taosReleaseRef(tscObjRef, res->self); taosReleaseRef(tscObjRef, res->self);
free(param); free(context);
} }
int32_t dispatcherStatementMerge(SArray* statements, SSqlObj** result) { int32_t dispatcherStatementMerge(SArray* statements, SSqlObj** result) {
...@@ -124,30 +124,32 @@ int32_t dispatcherStatementMerge(SArray* statements, SSqlObj** result) { ...@@ -124,30 +124,32 @@ int32_t dispatcherStatementMerge(SArray* statements, SSqlObj** result) {
// initialize the callback context. // initialize the callback context.
context->count = count; context->count = count;
for (size_t i = 0; i < count; ++i) { for (size_t i = 0; i < count; ++i) {
SSqlObj* statement = *((SSqlObj**)taosArrayGet(statements, i)); SSqlObj* statement = taosArrayGetP(statements, i);
context->runnable[i].fp = statement->fp; context->runnable[i].fp = statement->fp;
context->runnable[i].param = statement->param; context->runnable[i].param = statement->param;
} }
// merge the statements into single one. // merge the statements into single one.
tscDebug("start to merge %zu sql objs", count); tscDebug("start to merge %zu sql objs", count);
SSqlObj *pSql = *((SSqlObj**)taosArrayGet(statements, 0)); SSqlObj *pFirst = taosArrayGetP(statements, 0);
SSqlObj *pNew = createSimpleSubObj(pSql, batchResultCallback, context, TSDB_SQL_INSERT); int32_t code = tscMergeKVPayLoadSqlObj(statements, pFirst);
if (!pNew) {
free(context);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
int32_t code = tscMergeKVPayLoadSqlObj(statements, pNew);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
const char* msg = tstrerror(code); const char* msg = tstrerror(code);
tscDebug("failed to merge sql objects: %s", msg); tscDebug("failed to merge sql objects: %s", msg);
free(context); free(context);
taosReleaseRef(tscObjRef, pNew->self); taosReleaseRef(tscObjRef, pFirst->self);
return code; return code;
} }
*result = pNew; pFirst->fp = batchResultCallback;
pFirst->param = context;
pFirst->fetchFp = pFirst->fp;
*result = pFirst;
for (int i = 1; i < count; ++i) {
SSqlObj *pSql = taosArrayGetP(statements, i);
taosReleaseRef(tscObjRef, pSql->self);
}
return code; return code;
} }
......
...@@ -2198,14 +2198,14 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) { ...@@ -2198,14 +2198,14 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) {
return result; return result;
} }
static void extractTableNameList(SSqlObj *pSql, SInsertStatementParam *pInsertParam, SArray* pTableDataBlockList) { static void extractTableNameList(SInsertStatementParam *pInsertParam, SArray* pTableDataBlockList) {
pInsertParam->numOfTables = (int32_t) taosHashGetSize(pInsertParam->pTableBlockHashList); pInsertParam->numOfTables = (int32_t) taosArrayGetSize(pTableDataBlockList);
if (pInsertParam->pTableNameList == NULL) { if (!pInsertParam->pTableNameList) {
pInsertParam->pTableNameList = malloc(pInsertParam->numOfTables * POINTER_BYTES); pInsertParam->pTableNameList = calloc(pInsertParam->numOfTables, sizeof(SName*));
} }
for (int i = 0; i < taosArrayGetSize(pTableDataBlockList); ++i) { for (int i = 0; i < taosArrayGetSize(pTableDataBlockList); ++i) {
STableDataBlocks* pBlocks = *((STableDataBlocks**) taosArrayGet(pTableDataBlockList, i)); STableDataBlocks* pBlocks = taosArrayGetP(pTableDataBlockList, i);
//tfree(pInsertParam->pTableNameList[i]); //tfree(pInsertParam->pTableNameList[i]);
pInsertParam->pTableNameList[i] = tNameDup(&pBlocks->tableName); pInsertParam->pTableNameList[i] = tNameDup(&pBlocks->tableName);
...@@ -2406,6 +2406,7 @@ size_t writeSSubmitMsgBlocksBuilder(SSubmitMsgBlocksBuilder* builder, SSubmitBlk ...@@ -2406,6 +2406,7 @@ size_t writeSSubmitMsgBlocksBuilder(SSubmitMsgBlocksBuilder* builder, SSubmitBlk
nWrite += writeSSubmitBlkBuilder(blocksBuilder, pBlock); nWrite += writeSSubmitBlkBuilder(blocksBuilder, pBlock);
iter = taosHashIterate(builder->blockBuilders, iter); iter = taosHashIterate(builder->blockBuilders, iter);
} }
return nWrite; return nWrite;
} }
...@@ -2664,7 +2665,7 @@ static bool appendSTableDataBlocksListBuilder(STableDataBlocksListBuilder* build ...@@ -2664,7 +2665,7 @@ static bool appendSTableDataBlocksListBuilder(STableDataBlocksListBuilder* build
* @param numOfTables the number of tables. * @param numOfTables the number of tables.
* @return the vnode data blocks list. * @return the vnode data blocks list.
*/ */
static SArray* buildSTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder, int32_t* numOfTables) { static SArray* buildSTableDataBlocksListBuilder(STableDataBlocksListBuilder* builder, size_t* numOfTables) {
SArray* pVnodeDataBlockList = taosArrayInit(taosHashGetSize(builder->dataBlocksBuilders), sizeof(STableDataBlocks*)); SArray* pVnodeDataBlockList = taosArrayInit(taosHashGetSize(builder->dataBlocksBuilders), sizeof(STableDataBlocks*));
if (!pVnodeDataBlockList) { if (!pVnodeDataBlockList) {
return NULL; return NULL;
...@@ -2677,13 +2678,11 @@ static SArray* buildSTableDataBlocksListBuilder(STableDataBlocksListBuilder* bui ...@@ -2677,13 +2678,11 @@ static SArray* buildSTableDataBlocksListBuilder(STableDataBlocksListBuilder* bui
if (!dataBlocks) { if (!dataBlocks) {
goto error; goto error;
} }
*numOfTables += dataBlocks->numOfTables;
numOfTables += dataBlocks->numOfTables;
taosArrayPush(pVnodeDataBlockList, &dataBlocks); taosArrayPush(pVnodeDataBlockList, &dataBlocks);
iter = taosHashIterate(builder->dataBlocksBuilders, iter); iter = taosHashIterate(builder->dataBlocksBuilders, iter);
} }
return pVnodeDataBlockList; return pVnodeDataBlockList;
error: error:
...@@ -2695,6 +2694,119 @@ error: ...@@ -2695,6 +2694,119 @@ error:
return NULL; return NULL;
} }
/**
* A Builder to build SInsertStatementParam::pTableNameList.
*/
typedef struct STableNameListBuilder {
SArray* tableNames;
} STableNameListBuilder;
/**
* Create STableNameListBuilder.
*/
STableNameListBuilder* createSTableNameListBuilder() {
STableNameListBuilder* builder = calloc(1, sizeof(STableNameListBuilder));
if (!builder) {
return NULL;
}
builder->tableNames = taosArrayInit(1, sizeof(SName*));
if (!builder->tableNames) {
free(builder);
return NULL;
}
return builder;
}
/**
* Destroy the STableNameListBuilder.
* @param builder the STableNameListBuilder.
*/
void destroySTableNameListBuilder(STableNameListBuilder* builder) {
if (!builder) {
return;
}
taosArrayDestroy(&builder->tableNames);
free(builder);
}
/**
* A util function to compare two SName.
*/
static int32_t compareSName(const void *x, const void *y) {
SName* left = *((SName **) x);
SName* right = *((SName **) y);
if (left == right) {
return 0;
}
return strncmp((const char*) left, (const char*) right, sizeof(SName));
}
/**
* Insert a SName to builder.
*
* @param builder the STableNameListBuilder.
* @param name the table name.
* @return whether it is success.
*/
bool insertSTableNameListBuilder(STableNameListBuilder* builder, SName* name) {
return taosArrayPush(builder->tableNames, &name);
}
/**
* Build the STable name list.
*
* @param builder the STableNameListBuilder.
* @param numOfTables the number of table.
* @return the STable name list.
*/
SName** buildSTableNameListBuilder(STableNameListBuilder* builder, size_t* numOfTables) {
if (!taosArrayGetSize(builder->tableNames)) {
*numOfTables = 0;
return NULL;
}
// sort and unique.
taosArraySort(builder->tableNames, compareSName);
size_t tail = 0;
for (size_t i = 1; i < taosArrayGetSize(builder->tableNames); ++i) {
SName* last = taosArrayGetP(builder->tableNames, tail);
SName* current = taosArrayGetP(builder->tableNames, i);
if (compareSName(last, current) != 0) {
++tail;
taosArraySet(builder->tableNames, tail, &current);
}
}
// build tableNameList
SName** tableNames = calloc(tail + 1, sizeof(SName*));
if (!tableNames) {
return NULL;
}
for (size_t i = 0; i <= tail; ++i) {
SName* clone = malloc(sizeof(SName));
if (!clone) {
goto error;
}
memcpy(clone, taosArrayGetP(builder->tableNames, i), sizeof(SName));
tableNames[i] = clone;
}
*numOfTables = tail + 1;
return tableNames;
error:
for (size_t i = 0; i <= tail; ++i) {
if (tableNames[i]) {
free(tableNames[i]);
}
}
free(tableNames);
return NULL;
}
/** /**
* Merge the KV-PayLoad SQL objects into single one. * Merge the KV-PayLoad SQL objects into single one.
* The statements here must be an insertion statement and no schema attached. * The statements here must be an insertion statement and no schema attached.
...@@ -2714,9 +2826,15 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj *result) { ...@@ -2714,9 +2826,15 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj *result) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
STableNameListBuilder* nameListBuilder = createSTableNameListBuilder();
if (!nameListBuilder) {
destroySTableDataBlocksListBuilder(builder);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
// append the existing data blocks to builder. // append the existing data blocks to builder.
for (int i = 0; i < taosArrayGetSize(statements); ++i) { for (size_t i = 0; i < taosArrayGetSize(statements); ++i) {
SSqlObj *pSql = *((SSqlObj**) taosArrayGet(statements, i)); SSqlObj *pSql = taosArrayGetP(statements, i);
SInsertStatementParam* pInsertParam = &pSql->cmd.insertParam; SInsertStatementParam* pInsertParam = &pSql->cmd.insertParam;
if (!pInsertParam->pDataBlocks) { if (!pInsertParam->pDataBlocks) {
continue; continue;
...@@ -2725,32 +2843,72 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj *result) { ...@@ -2725,32 +2843,72 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj *result) {
assert(pInsertParam->payloadType == PAYLOAD_TYPE_KV); assert(pInsertParam->payloadType == PAYLOAD_TYPE_KV);
assert(!pInsertParam->schemaAttached); assert(!pInsertParam->schemaAttached);
for (int j = 0; j < taosArrayGetSize(pInsertParam->pDataBlocks); ++j) { // append each vnode data block to the builder.
STableDataBlocks * tableBlock = *((STableDataBlocks **) taosArrayGet(pInsertParam->pDataBlocks, j)); for (size_t j = 0; j < taosArrayGetSize(pInsertParam->pDataBlocks); ++j) {
STableDataBlocks * tableBlock = taosArrayGetP(pInsertParam->pDataBlocks, j);
if (!appendSTableDataBlocksListBuilder(builder, tableBlock)) { if (!appendSTableDataBlocksListBuilder(builder, tableBlock)) {
destroySTableDataBlocksListBuilder(builder); destroySTableDataBlocksListBuilder(builder);
destroySTableNameListBuilder(nameListBuilder);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
for (int k = 0; k < pInsertParam->numOfTables; ++k) {
if (!insertSTableNameListBuilder(nameListBuilder, pInsertParam->pTableNameList[k])) {
destroySTableDataBlocksListBuilder(builder);
destroySTableNameListBuilder(nameListBuilder);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
}
} }
} }
// build the vnode data blocks. // build the vnode data blocks.
int32_t numOfTables = 0; size_t numOfBlocks = 0;
SInsertStatementParam* pInsertParam = &result->cmd.insertParam; SInsertStatementParam* pInsertParam = &result->cmd.insertParam;
SArray* pVnodeDataBlocksList = buildSTableDataBlocksListBuilder(builder, &numOfTables); SArray* pVnodeDataBlocksList = buildSTableDataBlocksListBuilder(builder, &numOfBlocks);
if (!pVnodeDataBlocksList) { if (!pVnodeDataBlocksList) {
destroySTableDataBlocksListBuilder(builder); destroySTableDataBlocksListBuilder(builder);
destroySTableNameListBuilder(nameListBuilder);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
// build the table name list.
size_t numOfTables = 0;
SName** pTableNameList = buildSTableNameListBuilder(nameListBuilder, &numOfTables);
if (!pTableNameList) {
destroySTableDataBlocksListBuilder(builder);
destroySTableNameListBuilder(nameListBuilder);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
if (numOfTables != numOfBlocks) {
printf("numOfTables=%zu, numOfBlocks=%zu\n", numOfTables, numOfBlocks);
}
assert(numOfTables == numOfBlocks);
// replace table name list.
if (pInsertParam->pTableNameList) {
for (size_t i = 0; i < pInsertParam->numOfTables; ++i) {
if (pInsertParam->pTableNameList[i]) {
free(pInsertParam->pTableNameList[i]);
}
}
free(pInsertParam->pTableNameList);
}
pInsertParam->pTableNameList = pTableNameList;
pInsertParam->numOfTables = (int32_t) numOfTables;
// replace vnode data blocks.
if (pInsertParam->pDataBlocks) {
for (size_t i = 0; i < taosArrayGetSize(pInsertParam->pDataBlocks); ++i) {
tscDestroyDataBlock(result, taosArrayGetP(pInsertParam->pDataBlocks, i), false);
}
taosArrayDestroy(&pInsertParam->pDataBlocks);
}
pInsertParam->pDataBlocks = pVnodeDataBlocksList; pInsertParam->pDataBlocks = pVnodeDataBlocksList;
pInsertParam->numOfTables = numOfTables;
// clean up. // clean up.
for (int i = 0; i < taosArrayGetSize(statements); ++i) {
SSqlObj *pSql = *((SSqlObj**) taosArrayGet(statements, i));
taosReleaseRef(tscObjRef, pSql->self);
}
destroySTableDataBlocksListBuilder(builder); destroySTableDataBlocksListBuilder(builder);
destroySTableNameListBuilder(nameListBuilder);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -2869,7 +3027,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertPar ...@@ -2869,7 +3027,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj *pSql, SInsertStatementParam *pInsertPar
pOneTableBlock = *p; pOneTableBlock = *p;
} }
extractTableNameList(pSql, pInsertParam, pTableDataBlockList); extractTableNameList(pInsertParam, pTableDataBlockList);
if (freeBlockMap && pInsertParam->pTableBlockHashList) { if (freeBlockMap && pInsertParam->pTableBlockHashList) {
taosHashCleanup(pInsertParam->pTableBlockHashList); taosHashCleanup(pInsertParam->pTableBlockHashList);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册