未验证 提交 1047da04 编写于 作者: S shenglian-zhou 提交者: GitHub

Merge pull request #14135 from taosdata/szhou/feature/sort-group-2

feat: add multiple group tsdb reads to table merge scan
......@@ -1919,9 +1919,10 @@ typedef struct STableMergeScanInfo {
int32_t tableStartIndex;
int32_t tableEndIndex;
bool hasGroupId;
uint64_t groupId;
SArray* dataReaders; // array of tsdbReaderT*
SReadHandle readHandle;
SArray* dataReaders; // array of tsdbReaderT*
SReadHandle readHandle;
int32_t bufPageSize;
uint32_t sortBufSize; // max buffer size for in-memory sort
......@@ -2218,6 +2219,18 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
{
size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList);
int32_t i = pInfo->tableStartIndex + 1;
for (; i < tableListSize; ++i) {
STableKeyInfo* tableKeyInfo = taosArrayGet(pInfo->tableListInfo->pTableList, i);
if (tableKeyInfo->groupId != pInfo->groupId) {
break;
}
}
pInfo->tableEndIndex = i - 1;
}
int32_t tableStartIdx = pInfo->tableStartIndex;
int32_t tableEndIdx = pInfo->tableEndIndex;
......@@ -2314,20 +2327,38 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList);
if (!pInfo->hasGroupId) {
pInfo->hasGroupId = true;
if (tableListSize == 0) {
doSetOperatorCompleted(pOperator);
return NULL;
}
pInfo->tableStartIndex = 0;
pInfo->tableEndIndex = taosArrayGetSize(pInfo->tableListInfo->pTableList) - 1;
pInfo->groupId = ((STableKeyInfo*)taosArrayGet(pInfo->tableListInfo->pTableList, pInfo->tableStartIndex))->groupId;
startGroupTableMergeScan(pOperator);
}
SSDataBlock* pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator);
if (pBlock != NULL) {
pOperator->resultInfo.totalRows += pBlock->info.rows;
} else {
stopGroupTableMergeScan(pOperator);
doSetOperatorCompleted(pOperator);
SSDataBlock* pBlock = NULL;
while (pInfo->tableStartIndex < tableListSize) {
pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator);
if (pBlock != NULL) {
pBlock->info.groupId = pInfo->groupId;
pOperator->resultInfo.totalRows += pBlock->info.rows;
return pBlock;
} else {
stopGroupTableMergeScan(pOperator);
if (pInfo->tableEndIndex >= tableListSize - 1) {
doSetOperatorCompleted(pOperator);
break;
}
pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
pInfo->groupId =
((STableKeyInfo*)taosArrayGet(pInfo->tableListInfo->pTableList, pInfo->tableStartIndex))->groupId;
startGroupTableMergeScan(pOperator);
}
}
return pBlock;
}
......@@ -2335,7 +2366,6 @@ void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) {
STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
cleanupQueryTableDataCond(&pTableScanInfo->cond);
if (pTableScanInfo->pColMatchInfo != NULL) {
taosArrayDestroy(pTableScanInfo->pColMatchInfo);
}
......@@ -2424,8 +2454,8 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
initResultSizeInfo(pOperator, 1024);
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, NULL, destroyTableMergeScanOperatorInfo,
NULL, NULL, getTableMergeScanExplainExecInfo);
createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, NULL, destroyTableMergeScanOperatorInfo, NULL,
NULL, getTableMergeScanExplainExecInfo);
pOperator->cost.openCost = 0;
return pOperator;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册