diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index a6e30ae0b5b77479d8b3e6cf6c130f5c8817da04..80be8c6a35407045cfc042a810e957a6860ab49d 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -58,7 +58,7 @@ int32_t tsNumOfMnodeFetchThreads = 1; int32_t tsNumOfMnodeReadThreads = 1; int32_t tsNumOfVnodeQueryThreads = 4; int32_t tsNumOfVnodeStreamThreads = 2; -int32_t tsNumOfVnodeFetchThreads = 4; +int32_t tsNumOfVnodeFetchThreads = 1; int32_t tsNumOfVnodeWriteThreads = 2; int32_t tsNumOfVnodeSyncThreads = 2; int32_t tsNumOfVnodeRsmaThreads = 2; @@ -162,8 +162,8 @@ int32_t tsMqRebalanceInterval = 2; int32_t tsTtlUnit = 86400; int32_t tsTtlPushInterval = 86400; int32_t tsGrantHBInterval = 60; -int32_t tsUptimeInterval = 300; // seconds -char tsUdfdResFuncs[1024] = ""; // udfd resident funcs that teardown when udfd exits +int32_t tsUptimeInterval = 300; // seconds +char tsUdfdResFuncs[1024] = ""; // udfd resident funcs that teardown when udfd exits #ifndef _STORAGE int32_t taosSetTfsCfg(SConfig *pCfg) { @@ -366,9 +366,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfVnodeStreamThreads = TMAX(tsNumOfVnodeStreamThreads, 4); if (cfgAddInt32(pCfg, "numOfVnodeStreamThreads", tsNumOfVnodeStreamThreads, 4, 1024, 0) != 0) return -1; - tsNumOfVnodeFetchThreads = tsNumOfCores / 4; - tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4); - if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 4, 1024, 0) != 0) return -1; + tsNumOfVnodeFetchThreads = 1; + if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 1, 1024, 0) != 0) return -1; tsNumOfVnodeWriteThreads = tsNumOfCores; tsNumOfVnodeWriteThreads = TMAX(tsNumOfVnodeWriteThreads, 1); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 45b88ec6a5c8143ba192aa7924cd873e4b406e6c..a36307de93aac793fc0168fe1fbbdb9800f0f506 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -52,11 +52,11 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu // TODO: if a block was set but not consumed, // prevent setting a different type of block pInfo->validBlockIndex = 0; - if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) { - taosArrayClearP(pInfo->pBlockLists, taosMemoryFree); - } else { - taosArrayClear(pInfo->pBlockLists); - } + /*if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {*/ + /*taosArrayClearP(pInfo->pBlockLists, taosMemoryFree);*/ + /*} else {*/ + taosArrayClear(pInfo->pBlockLists); + /*}*/ if (type == STREAM_INPUT__MERGED_SUBMIT) { // ASSERT(numOfBlocks > 1); @@ -79,7 +79,9 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu } else if (type == STREAM_INPUT__DATA_BLOCK) { for (int32_t i = 0; i < numOfBlocks; ++i) { SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; + taosArrayPush(pInfo->pBlockLists, &pDataBlock); +#if 0 // TODO optimize SSDataBlock* p = createOneDataBlock(pDataBlock, false); p->info = pDataBlock->info; @@ -87,6 +89,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu taosArrayClear(p->pDataBlock); taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock); taosArrayPush(pInfo->pBlockLists, &p); +#endif } pInfo->blockType = STREAM_INPUT__DATA_BLOCK; } else { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 4d16b2e17047de683f2c72ef871fd907e6dc1d81..8dcc2cd39be48e6d7c3d2cc6e14ed9239286cbb5 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1602,26 +1602,30 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { } break; case STREAM_DELETE_DATA: { printDataBlock(pBlock, "stream scan delete recv"); + SSDataBlock* pDelBlock = NULL; if (pInfo->tqReader) { - SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA); + pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA); filterDelBlockByUid(pDelBlock, pBlock, pInfo); - pBlock = pDelBlock; + } else { + pDelBlock = pBlock; } - printDataBlock(pBlock, "stream scan delete recv filtered"); if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) { - generateDeleteResultBlock(pInfo, pBlock, pInfo->pDeleteDataRes); + generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes); pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT; - printDataBlock(pBlock, "stream scan delete result"); + printDataBlock(pDelBlock, "stream scan delete result"); return pInfo->pDeleteDataRes; } else { pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; pInfo->updateResIndex = 0; - generateScanRange(pInfo, pBlock, pInfo->pUpdateRes); + generateScanRange(pInfo, pDelBlock, pInfo->pUpdateRes); prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes); pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA; pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; - printDataBlock(pBlock, "stream scan delete data"); + printDataBlock(pDelBlock, "stream scan delete data"); + if (pInfo->tqReader) { + blockDataDestroy(pDelBlock); + } return pInfo->pDeleteDataRes; } } break;