未验证 提交 99c58a66 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #13920 from taosdata/feature/3_liaohj

refactor(query): set correct buffer pages.
...@@ -2170,25 +2170,17 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, int32_t capa ...@@ -2170,25 +2170,17 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, int32_t capa
blockDataEnsureCapacity(p, capacity); blockDataEnsureCapacity(p, capacity);
while (1) { while (1) {
STupleHandle* pTupleHandle = NULL; STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
if (pInfo->prefetchedTuple == NULL) {
pTupleHandle = tsortNextTuple(pHandle);
} else {
pTupleHandle = pInfo->prefetchedTuple;
}
if (pTupleHandle == NULL) { if (pTupleHandle == NULL) {
break; break;
} }
appendOneRowToDataBlock(p, pTupleHandle); appendOneRowToDataBlock(p, pTupleHandle);
if (p->info.rows >= capacity) { if (p->info.rows >= capacity) {
break; break;
} }
} }
qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), p->info.rows); qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), p->info.rows);
return (p->info.rows > 0) ? p : NULL; return (p->info.rows > 0) ? p : NULL;
} }
...@@ -2261,7 +2253,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN ...@@ -2261,7 +2253,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) { SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) {
STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo)); STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
goto _error; goto _error;
} }
...@@ -2284,17 +2275,16 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN ...@@ -2284,17 +2275,16 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
pInfo->readHandle = *readHandle; pInfo->readHandle = *readHandle;
pInfo->interval = extractIntervalInfo(pTableScanNode); pInfo->interval = extractIntervalInfo(pTableScanNode);
pInfo->sample.sampleRatio = pTableScanNode->ratio; pInfo->sample.sampleRatio = pTableScanNode->ratio;
pInfo->sample.seed = taosGetTimestampSec(); pInfo->sample.seed = taosGetTimestampSec();
pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired; pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
pInfo->pFilterNode = pTableScanNode->scan.node.pConditions; pInfo->dataReaders = dataReaders;
pInfo->dataReaders = dataReaders; pInfo->scanFlag = MAIN_SCAN;
pInfo->scanFlag = MAIN_SCAN; pInfo->pColMatchInfo = pColList;
pInfo->pColMatchInfo = pColList; pInfo->curTWinIdx = 0;
pInfo->curTWinIdx = 0;
pInfo->pResBlock = createResDataBlock(pDescNode); pInfo->pResBlock = createResDataBlock(pDescNode);
...@@ -2307,22 +2297,27 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN ...@@ -2307,22 +2297,27 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
taosArrayPush(pInfo->sortSourceParams, param); taosArrayPush(pInfo->sortSourceParams, param);
taosMemoryFree(param); taosMemoryFree(param);
} }
pInfo->pSortInfo = generateSortByTsInfo(pInfo->cond.order); pInfo->pSortInfo = generateSortByTsInfo(pInfo->cond.order);
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false); pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
int32_t rowSize = pInfo->pResBlock->info.rowSize; int32_t rowSize = pInfo->pResBlock->info.rowSize;
int32_t blockMetaSize = (int32_t)blockDataGetSerialMetaSize(pInfo->pResBlock->info.numOfCols); pInfo->bufPageSize = getProperSortPageSize(rowSize);
pInfo->bufPageSize = (rowSize * 2 + blockMetaSize) < 1024 ? 1024 : (rowSize * 2 + blockMetaSize);
pInfo->sortBufSize = pInfo->bufPageSize * 16; // todo the total available buffer should be determined by total capacity of buffer of this task.
pInfo->hasGroupId = false; // the additional one is reserved for merge result
pInfo->sortBufSize = pInfo->bufPageSize * (taosArrayGetSize(dataReaders) + 1);
pInfo->hasGroupId = false;
pInfo->prefetchedTuple = NULL; pInfo->prefetchedTuple = NULL;
pOperator->name = "TableMergeScanOperator"; pOperator->name = "TableMergeScanOperator";
// TODO : change it
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN;
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->numOfExprs = numOfCols; pOperator->numOfExprs = numOfCols;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
initResultSizeInfo(pOperator, 1024); initResultSizeInfo(pOperator, 1024);
pOperator->fpSet = pOperator->fpSet =
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册