提交 ae294b98 编写于 作者: L Liu Jicong

enh(stream): blocklist do not copy

上级 72fba5a8
...@@ -58,7 +58,7 @@ int32_t tsNumOfMnodeFetchThreads = 1; ...@@ -58,7 +58,7 @@ int32_t tsNumOfMnodeFetchThreads = 1;
int32_t tsNumOfMnodeReadThreads = 1; int32_t tsNumOfMnodeReadThreads = 1;
int32_t tsNumOfVnodeQueryThreads = 4; int32_t tsNumOfVnodeQueryThreads = 4;
int32_t tsNumOfVnodeStreamThreads = 2; int32_t tsNumOfVnodeStreamThreads = 2;
int32_t tsNumOfVnodeFetchThreads = 4; int32_t tsNumOfVnodeFetchThreads = 1;
int32_t tsNumOfVnodeWriteThreads = 2; int32_t tsNumOfVnodeWriteThreads = 2;
int32_t tsNumOfVnodeSyncThreads = 2; int32_t tsNumOfVnodeSyncThreads = 2;
int32_t tsNumOfVnodeRsmaThreads = 2; int32_t tsNumOfVnodeRsmaThreads = 2;
...@@ -162,8 +162,8 @@ int32_t tsMqRebalanceInterval = 2; ...@@ -162,8 +162,8 @@ int32_t tsMqRebalanceInterval = 2;
int32_t tsTtlUnit = 86400; int32_t tsTtlUnit = 86400;
int32_t tsTtlPushInterval = 86400; int32_t tsTtlPushInterval = 86400;
int32_t tsGrantHBInterval = 60; int32_t tsGrantHBInterval = 60;
int32_t tsUptimeInterval = 300; // seconds int32_t tsUptimeInterval = 300; // seconds
char tsUdfdResFuncs[1024] = ""; // udfd resident funcs that teardown when udfd exits char tsUdfdResFuncs[1024] = ""; // udfd resident funcs that teardown when udfd exits
#ifndef _STORAGE #ifndef _STORAGE
int32_t taosSetTfsCfg(SConfig *pCfg) { int32_t taosSetTfsCfg(SConfig *pCfg) {
...@@ -366,9 +366,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { ...@@ -366,9 +366,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfVnodeStreamThreads = TMAX(tsNumOfVnodeStreamThreads, 4); tsNumOfVnodeStreamThreads = TMAX(tsNumOfVnodeStreamThreads, 4);
if (cfgAddInt32(pCfg, "numOfVnodeStreamThreads", tsNumOfVnodeStreamThreads, 4, 1024, 0) != 0) return -1; if (cfgAddInt32(pCfg, "numOfVnodeStreamThreads", tsNumOfVnodeStreamThreads, 4, 1024, 0) != 0) return -1;
tsNumOfVnodeFetchThreads = tsNumOfCores / 4; tsNumOfVnodeFetchThreads = 1;
tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4); if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 1, 1024, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 4, 1024, 0) != 0) return -1;
tsNumOfVnodeWriteThreads = tsNumOfCores; tsNumOfVnodeWriteThreads = tsNumOfCores;
tsNumOfVnodeWriteThreads = TMAX(tsNumOfVnodeWriteThreads, 1); tsNumOfVnodeWriteThreads = TMAX(tsNumOfVnodeWriteThreads, 1);
......
...@@ -52,11 +52,11 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu ...@@ -52,11 +52,11 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
// TODO: if a block was set but not consumed, // TODO: if a block was set but not consumed,
// prevent setting a different type of block // prevent setting a different type of block
pInfo->validBlockIndex = 0; pInfo->validBlockIndex = 0;
if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) { /*if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {*/
taosArrayClearP(pInfo->pBlockLists, taosMemoryFree); /*taosArrayClearP(pInfo->pBlockLists, taosMemoryFree);*/
} else { /*} else {*/
taosArrayClear(pInfo->pBlockLists); taosArrayClear(pInfo->pBlockLists);
} /*}*/
if (type == STREAM_INPUT__MERGED_SUBMIT) { if (type == STREAM_INPUT__MERGED_SUBMIT) {
// ASSERT(numOfBlocks > 1); // ASSERT(numOfBlocks > 1);
...@@ -79,7 +79,9 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu ...@@ -79,7 +79,9 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
} else if (type == STREAM_INPUT__DATA_BLOCK) { } else if (type == STREAM_INPUT__DATA_BLOCK) {
for (int32_t i = 0; i < numOfBlocks; ++i) { for (int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
taosArrayPush(pInfo->pBlockLists, &pDataBlock);
#if 0
// TODO optimize // TODO optimize
SSDataBlock* p = createOneDataBlock(pDataBlock, false); SSDataBlock* p = createOneDataBlock(pDataBlock, false);
p->info = pDataBlock->info; p->info = pDataBlock->info;
...@@ -87,6 +89,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu ...@@ -87,6 +89,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
taosArrayClear(p->pDataBlock); taosArrayClear(p->pDataBlock);
taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock); taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock);
taosArrayPush(pInfo->pBlockLists, &p); taosArrayPush(pInfo->pBlockLists, &p);
#endif
} }
pInfo->blockType = STREAM_INPUT__DATA_BLOCK; pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
} else { } else {
......
...@@ -1602,26 +1602,30 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1602,26 +1602,30 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
} break; } break;
case STREAM_DELETE_DATA: { case STREAM_DELETE_DATA: {
printDataBlock(pBlock, "stream scan delete recv"); printDataBlock(pBlock, "stream scan delete recv");
SSDataBlock* pDelBlock = NULL;
if (pInfo->tqReader) { if (pInfo->tqReader) {
SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA); pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
filterDelBlockByUid(pDelBlock, pBlock, pInfo); filterDelBlockByUid(pDelBlock, pBlock, pInfo);
pBlock = pDelBlock; } else {
pDelBlock = pBlock;
} }
printDataBlock(pBlock, "stream scan delete recv filtered");
if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) { if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) {
generateDeleteResultBlock(pInfo, pBlock, pInfo->pDeleteDataRes); generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes);
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT; pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT;
printDataBlock(pBlock, "stream scan delete result"); printDataBlock(pDelBlock, "stream scan delete result");
return pInfo->pDeleteDataRes; return pInfo->pDeleteDataRes;
} else { } else {
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
pInfo->updateResIndex = 0; pInfo->updateResIndex = 0;
generateScanRange(pInfo, pBlock, pInfo->pUpdateRes); generateScanRange(pInfo, pDelBlock, pInfo->pUpdateRes);
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes); copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA; pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
printDataBlock(pBlock, "stream scan delete data"); printDataBlock(pDelBlock, "stream scan delete data");
if (pInfo->tqReader) {
blockDataDestroy(pDelBlock);
}
return pInfo->pDeleteDataRes; return pInfo->pDeleteDataRes;
} }
} break; } break;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册