提交 bd874dd0 编写于 作者: Z zhihaop

fix: resizeDataBlocksData returns false incorrectly

上级 1525769e
...@@ -56,7 +56,9 @@ inline static void tscReturnsError(SSqlObj* pSql, int code) { ...@@ -56,7 +56,9 @@ inline static void tscReturnsError(SSqlObj* pSql, int code) {
} }
pSql->res.code = code; pSql->res.code = code;
tscAsyncResultOnError(pSql); if (pSql->fp) {
pSql->fp(pSql->param, pSql, code);
}
} }
/** /**
......
...@@ -2219,20 +2219,20 @@ static void extractTableNameList(SSqlObj *pSql, SInsertStatementParam *pInsertPa ...@@ -2219,20 +2219,20 @@ static void extractTableNameList(SSqlObj *pSql, SInsertStatementParam *pInsertPa
* @param destSize the destination size. * @param destSize the destination size.
* @return whether is success. * @return whether is success.
*/ */
inline static bool resizeDataBlocksData(STableDataBlocks* dataBlocks, size_t destSize) { static bool resizeDataBlocksData(STableDataBlocks* dataBlocks, size_t destSize) {
if (dataBlocks->nAllocSize >= destSize) { if (dataBlocks->nAllocSize >= destSize) {
return true; return true;
} }
size_t nAllocSize = destSize + (destSize >> 1); size_t nAllocSize = destSize + (destSize >> 1);
char *pData = realloc(dataBlocks->pData, dataBlocks->nAllocSize); char *pData = realloc(dataBlocks->pData, nAllocSize);
if (!pData) { if (!pData) {
return false; return false;
} }
dataBlocks->pData = pData; dataBlocks->pData = pData;
dataBlocks->nAllocSize = nAllocSize; dataBlocks->nAllocSize = nAllocSize;
return false; return true;
} }
/** /**
...@@ -2448,6 +2448,7 @@ static void destroySSubmitMsgBuilder(SSubmitMsgBlocksBuilder* builder) { ...@@ -2448,6 +2448,7 @@ static void destroySSubmitMsgBuilder(SSubmitMsgBlocksBuilder* builder) {
iter = taosHashIterate(builder->blockBuilders, iter); iter = taosHashIterate(builder->blockBuilders, iter);
} }
taosHashCleanup(builder->blockBuilders); taosHashCleanup(builder->blockBuilders);
free(builder);
} }
/** /**
...@@ -2575,10 +2576,9 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj **result) { ...@@ -2575,10 +2576,9 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj **result) {
// header: SMsgDesc + SSubmitMsg(without SSubmitBlk[]) // header: SMsgDesc + SSubmitMsg(without SSubmitBlk[])
assert(dataBuf->headerSize == (sizeof(SMsgDesc) + sizeof(SSubmitMsg))); assert(dataBuf->headerSize == (sizeof(SMsgDesc) + sizeof(SSubmitMsg)));
assert(dataBuf->headerSize == tableBlock->headerSize); assert(dataBuf->headerSize == tableBlock->headerSize);
const size_t headerSize = tableBlock->headerSize;
const size_t destSize = dataBuf->size + (tableBlock->size - tableBlock->headerSize); const size_t destSize = dataBuf->size + (tableBlock->size - tableBlock->headerSize);
if (!resizeDataBlocksData(dataBuf, debugFlag)) { if (!resizeDataBlocksData(dataBuf, destSize)) {
tscError("0x%" PRIx64 " failed to allocate memory for merging submit block, size:%d", pInsertParam->objectId, tscError("0x%" PRIx64 " failed to allocate memory for merging submit block, size:%d", pInsertParam->objectId,
dataBuf->nAllocSize); dataBuf->nAllocSize);
...@@ -2586,15 +2586,8 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj **result) { ...@@ -2586,15 +2586,8 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj **result) {
tfree(dataBuf->pData); tfree(dataBuf->pData);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
// pData = SMsgDesc + SSubmitMsg(with SSubmitBlk[])
SSubmitBlk* target = (SSubmitBlk* ) (dataBuf->pData + headerSize);
SSubmitBlk* source = (SSubmitBlk* ) (tableBlock->pData + headerSize);
const size_t targetSize = dataBuf->size - headerSize;
const size_t sourceSize = tableBlock->size - headerSize;
memcpy(POINTER_SHIFT(target, targetSize), source, sourceSize); memcpy(dataBuf->pData + dataBuf->size, tableBlock->pData + tableBlock->headerSize, tableBlock->size - tableBlock->headerSize);
dataBuf->size = destSize; dataBuf->size = destSize;
dataBuf->numOfTables += tableBlock->numOfTables; dataBuf->numOfTables += tableBlock->numOfTables;
tscDestroyDataBlock(pSql, tableBlock, false); tscDestroyDataBlock(pSql, tableBlock, false);
...@@ -2602,10 +2595,14 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj **result) { ...@@ -2602,10 +2595,14 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj **result) {
// free the data blocks and sql objs. (because it is no longer needed). // free the data blocks and sql objs. (because it is no longer needed).
taosArrayDestroy(&pInsertParam->pDataBlocks); taosArrayDestroy(&pInsertParam->pDataBlocks);
taosReleaseRef(tscObjRef, pSql->self);
} }
// clean up. // clean up.
for (int i = 1; i < taosArrayGetSize(statements); ++i) {
SSqlObj *pSql = *((SSqlObj**) taosArrayGet(statements, i));
taosReleaseRef(tscObjRef, pSql->self);
}
taosHashCleanup(pVnodeDataBlockHashList); taosHashCleanup(pVnodeDataBlockHashList);
*result = merged; *result = merged;
......
...@@ -134,8 +134,8 @@ int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance ...@@ -134,8 +134,8 @@ int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance
// received the statements depends on the network quality. // received the statements depends on the network quality.
bool tsAsyncBatchEnable = true; bool tsAsyncBatchEnable = true;
bool tsAsyncBatchThreadLocal = false; // if thread local enable, each thread will allocate a dispatcher. bool tsAsyncBatchThreadLocal = false; // if thread local enable, each thread will allocate a dispatcher.
int32_t tsAsyncBatchSize = 256; int32_t tsAsyncBatchSize = 96;
int32_t tsAsyncBatchTimeout = 5; int32_t tsAsyncBatchTimeout = 10;
// the maximum allowed query buffer size during query processing for each data node. // the maximum allowed query buffer size during query processing for each data node.
// -1 no limit (default) // -1 no limit (default)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册