From bd874dd08072a01ca9b34035a13f08461eb6c6a5 Mon Sep 17 00:00:00 2001 From: zhihaop Date: Fri, 23 Sep 2022 00:29:01 +0800 Subject: [PATCH] fix: resizeDataBlocksData returns false incorrectly --- src/client/src/tscBulkWrite.c | 4 +++- src/client/src/tscUtil.c | 25 +++++++++++-------------- src/common/src/tglobal.c | 4 ++-- 3 files changed, 16 insertions(+), 17 deletions(-) diff --git a/src/client/src/tscBulkWrite.c b/src/client/src/tscBulkWrite.c index 65991e1572..972793a5d6 100644 --- a/src/client/src/tscBulkWrite.c +++ b/src/client/src/tscBulkWrite.c @@ -56,7 +56,9 @@ inline static void tscReturnsError(SSqlObj* pSql, int code) { } pSql->res.code = code; - tscAsyncResultOnError(pSql); + if (pSql->fp) { + pSql->fp(pSql->param, pSql, code); + } } /** diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 479de72dd6..a8b5d91be3 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -2219,20 +2219,20 @@ static void extractTableNameList(SSqlObj *pSql, SInsertStatementParam *pInsertPa * @param destSize the destination size. * @return whether is success. */ -inline static bool resizeDataBlocksData(STableDataBlocks* dataBlocks, size_t destSize) { +static bool resizeDataBlocksData(STableDataBlocks* dataBlocks, size_t destSize) { if (dataBlocks->nAllocSize >= destSize) { return true; } size_t nAllocSize = destSize + (destSize >> 1); - char *pData = realloc(dataBlocks->pData, dataBlocks->nAllocSize); + char *pData = realloc(dataBlocks->pData, nAllocSize); if (!pData) { return false; } dataBlocks->pData = pData; dataBlocks->nAllocSize = nAllocSize; - return false; + return true; } /** @@ -2448,6 +2448,7 @@ static void destroySSubmitMsgBuilder(SSubmitMsgBlocksBuilder* builder) { iter = taosHashIterate(builder->blockBuilders, iter); } taosHashCleanup(builder->blockBuilders); + free(builder); } /** @@ -2575,10 +2576,9 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj **result) { // header: SMsgDesc + SSubmitMsg(without SSubmitBlk[]) assert(dataBuf->headerSize == (sizeof(SMsgDesc) + sizeof(SSubmitMsg))); assert(dataBuf->headerSize == tableBlock->headerSize); - const size_t headerSize = tableBlock->headerSize; const size_t destSize = dataBuf->size + (tableBlock->size - tableBlock->headerSize); - if (!resizeDataBlocksData(dataBuf, debugFlag)) { + if (!resizeDataBlocksData(dataBuf, destSize)) { tscError("0x%" PRIx64 " failed to allocate memory for merging submit block, size:%d", pInsertParam->objectId, dataBuf->nAllocSize); @@ -2586,15 +2586,8 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj **result) { tfree(dataBuf->pData); return TSDB_CODE_TSC_OUT_OF_MEMORY; } - - // pData = SMsgDesc + SSubmitMsg(with SSubmitBlk[]) - SSubmitBlk* target = (SSubmitBlk* ) (dataBuf->pData + headerSize); - SSubmitBlk* source = (SSubmitBlk* ) (tableBlock->pData + headerSize); - - const size_t targetSize = dataBuf->size - headerSize; - const size_t sourceSize = tableBlock->size - headerSize; - memcpy(POINTER_SHIFT(target, targetSize), source, sourceSize); + memcpy(dataBuf->pData + dataBuf->size, tableBlock->pData + tableBlock->headerSize, tableBlock->size - tableBlock->headerSize); dataBuf->size = destSize; dataBuf->numOfTables += tableBlock->numOfTables; tscDestroyDataBlock(pSql, tableBlock, false); @@ -2602,10 +2595,14 @@ int32_t tscMergeKVPayLoadSqlObj(SArray* statements, SSqlObj **result) { // free the data blocks and sql objs. (because it is no longer needed). taosArrayDestroy(&pInsertParam->pDataBlocks); - taosReleaseRef(tscObjRef, pSql->self); } // clean up. + for (int i = 1; i < taosArrayGetSize(statements); ++i) { + SSqlObj *pSql = *((SSqlObj**) taosArrayGet(statements, i)); + taosReleaseRef(tscObjRef, pSql->self); + } + taosHashCleanup(pVnodeDataBlockHashList); *result = merged; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 319503ee70..f09bb3abb2 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -134,8 +134,8 @@ int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance // received the statements depends on the network quality. bool tsAsyncBatchEnable = true; bool tsAsyncBatchThreadLocal = false; // if thread local enable, each thread will allocate a dispatcher. -int32_t tsAsyncBatchSize = 256; -int32_t tsAsyncBatchTimeout = 5; +int32_t tsAsyncBatchSize = 96; +int32_t tsAsyncBatchTimeout = 10; // the maximum allowed query buffer size during query processing for each data node. // -1 no limit (default) -- GitLab