未验证 提交 d88615f9 编写于 作者: S shenglian-zhou 提交者: GitHub

Merge pull request #14132 from taosdata/szhou/feature/sort-group-2

feat: add multiple group tsdb reads to table merge scan
...@@ -844,8 +844,8 @@ int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle ...@@ -844,8 +844,8 @@ int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle
int32_t doCreateMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, int32_t doCreateMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
STableListInfo* pTableListInfo, SArray* arrayReader, uint64_t queryId, STableListInfo* pTableListInfo, SArray* arrayReader, uint64_t queryId,
uint64_t taskId); uint64_t taskId);
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SArray* dataReaders, SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo *pTableListInfo,
SReadHandle* readHandle, SExecTaskInfo* pTaskInfo); SReadHandle* readHandle, SExecTaskInfo* pTaskInfo, uint64_t queryId, uint64_t taskId);
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex); void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex);
......
...@@ -4023,14 +4023,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4023,14 +4023,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return pOperator; return pOperator;
} else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode; STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
SArray* dataReaders = taosArrayInit(8, POINTER_BYTES);
createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId, pTagCond); createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId, pTagCond);
doCreateMultipleDataReaders(pTableScanNode, pHandle, pTableListInfo, dataReaders, queryId, taskId);
extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo); extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
SOperatorInfo* pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo, queryId, taskId);
SOperatorInfo* pOperator = createTableMergeScanOperatorInfo(pTableScanNode, dataReaders, pHandle, pTaskInfo);
STableScanInfo* pScanInfo = pOperator->info; STableScanInfo* pScanInfo = pOperator->info;
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
return pOperator; return pOperator;
......
...@@ -1914,8 +1914,13 @@ _error: ...@@ -1914,8 +1914,13 @@ _error:
} }
typedef struct STableMergeScanInfo { typedef struct STableMergeScanInfo {
SArray* dataReaders; // array of tsdbReaderT* STableListInfo* tableListInfo;
SReadHandle readHandle; int32_t tableStartIndex;
int32_t tableEndIndex;
bool hasGroupId;
SArray* dataReaders; // array of tsdbReaderT*
SReadHandle readHandle;
int32_t bufPageSize; int32_t bufPageSize;
uint32_t sortBufSize; // max buffer size for in-memory sort uint32_t sortBufSize; // max buffer size for in-memory sort
...@@ -1926,11 +1931,9 @@ typedef struct STableMergeScanInfo { ...@@ -1926,11 +1931,9 @@ typedef struct STableMergeScanInfo {
SSDataBlock* pSortInputBlock; SSDataBlock* pSortInputBlock;
int64_t startTs; // sort start time int64_t startTs; // sort start time
bool hasGroupId; SArray* sortSourceParams;
uint64_t groupId; uint64_t queryId;
STupleHandle* prefetchedTuple; uint64_t taskId;
SArray* sortSourceParams;
SFileBlockLoadRecorder readRecorder; SFileBlockLoadRecorder readRecorder;
int64_t numOfRows; int64_t numOfRows;
...@@ -1958,8 +1961,6 @@ typedef struct STableMergeScanInfo { ...@@ -1958,8 +1961,6 @@ typedef struct STableMergeScanInfo {
// window to check if current data block needs to be loaded. // window to check if current data block needs to be loaded.
SSampleExecInfo sample; // sample execution info SSampleExecInfo sample; // sample execution info
int32_t curTWinIdx;
} STableMergeScanInfo; } STableMergeScanInfo;
int32_t compareTableKeyInfoByGid(const void* p1, const void* p2) { int32_t compareTableKeyInfoByGid(const void* p1, const void* p2) {
...@@ -1969,8 +1970,7 @@ int32_t compareTableKeyInfoByGid(const void* p1, const void* p2) { ...@@ -1969,8 +1970,7 @@ int32_t compareTableKeyInfoByGid(const void* p1, const void* p2) {
} }
int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, SNode* pTagCond) {
SNode* pTagCond) {
int32_t code = getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo, pTagCond); int32_t code = getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo, pTagCond);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
...@@ -1990,11 +1990,10 @@ int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle ...@@ -1990,11 +1990,10 @@ int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle
} }
int32_t doCreateMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, int32_t doCreateMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
STableListInfo* pTableListInfo, SArray* arrayReader, uint64_t queryId, STableListInfo* pTableListInfo, SArray* arrayReader, uint64_t queryId,
uint64_t taskId) { uint64_t taskId) {
SQueryTableDataCond cond = {0}; SQueryTableDataCond cond = {0};
int32_t code = initQueryTableDataCond(&cond, pTableScanNode); int32_t code = initQueryTableDataCond(&cond, pTableScanNode);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -2017,6 +2016,24 @@ _error: ...@@ -2017,6 +2016,24 @@ _error:
return code; return code;
} }
int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle, STableListInfo* pTableListInfo,
int32_t tableStartIdx, int32_t tableEndIdx, SArray* arrayReader, uint64_t queryId,
uint64_t taskId) {
for (int32_t i = tableStartIdx; i <= tableEndIdx; ++i) {
STableListInfo* subListInfo = taosMemoryCalloc(1, sizeof(subListInfo));
subListInfo->pTableList = taosArrayInit(1, sizeof(STableKeyInfo));
taosArrayPush(subListInfo->pTableList, taosArrayGet(pTableListInfo->pTableList, i));
tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, pQueryCond, subListInfo, queryId, taskId);
taosArrayPush(arrayReader, &pReader);
taosArrayDestroy(subListInfo->pTableList);
taosMemoryFree(subListInfo);
}
return TSDB_CODE_SUCCESS;
}
static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo, static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo,
int32_t readerIdx, SSDataBlock* pBlock, uint32_t* status) { int32_t readerIdx, SSDataBlock* pBlock, uint32_t* status) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
...@@ -2196,22 +2213,35 @@ SArray* generateSortByTsInfo(int32_t order) { ...@@ -2196,22 +2213,35 @@ SArray* generateSortByTsInfo(int32_t order) {
return pList; return pList;
} }
int32_t doOpenTableMergeScanOperator(SOperatorInfo* pOperator) { int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
STableMergeScanInfo* pInfo = pOperator->info; STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (OPTR_IS_OPENED(pOperator)) { int32_t tableStartIdx = pInfo->tableStartIndex;
return TSDB_CODE_SUCCESS; int32_t tableEndIdx = pInfo->tableEndIndex;
}
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; STableListInfo* tableListInfo = pInfo->tableListInfo;
createMultipleDataReaders(&pInfo->cond, &pInfo->readHandle, tableListInfo, tableStartIdx, tableEndIdx,
pInfo->dataReaders, pInfo->queryId, pInfo->taskId);
// todo the total available buffer should be determined by total capacity of buffer of this task.
// the additional one is reserved for merge result
pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1);
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage, pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->pSortInputBlock, pTaskInfo->id.str); pInfo->pSortInputBlock, pTaskInfo->id.str);
tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlock, NULL, NULL); tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlock, NULL, NULL);
size_t numReaders = taosArrayGetSize(pInfo->dataReaders); size_t numReaders = taosArrayGetSize(pInfo->dataReaders);
for (int32_t i = 0; i < numReaders; ++i) {
STableMergeScanSortSourceParam param = {0};
param.readerIdx = i;
param.pOperator = pOperator;
param.inputBlock = createOneDataBlock(pInfo->pResBlock, false);
taosArrayPush(pInfo->sortSourceParams, &param);
}
for (int32_t i = 0; i < numReaders; ++i) { for (int32_t i = 0; i < numReaders; ++i) {
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i); STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i);
...@@ -2225,9 +2255,22 @@ int32_t doOpenTableMergeScanOperator(SOperatorInfo* pOperator) { ...@@ -2225,9 +2255,22 @@ int32_t doOpenTableMergeScanOperator(SOperatorInfo* pOperator) {
longjmp(pTaskInfo->env, terrno); longjmp(pTaskInfo->env, terrno);
} }
pOperator->status = OP_RES_TO_RETURN; return TSDB_CODE_SUCCESS;
}
int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
tsortDestroySortHandle(pInfo->pSortHandle);
taosArrayClear(pInfo->sortSourceParams);
for (int32_t i = 0; i < taosArrayGetSize(pInfo->dataReaders); ++i) {
tsdbReaderT* reader = taosArrayGetP(pInfo->dataReaders, i);
tsdbCleanupReadHandle(reader);
}
taosArrayDestroy(pInfo->dataReaders);
OPTR_SET_OPENED(pOperator);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -2270,12 +2313,18 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { ...@@ -2270,12 +2313,18 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code); longjmp(pTaskInfo->env, code);
} }
if (!pInfo->hasGroupId) {
pInfo->hasGroupId = true;
pInfo->tableStartIndex = 0;
pInfo->tableEndIndex = taosArrayGetSize(pInfo->tableListInfo->pTableList) - 1;
startGroupTableMergeScan(pOperator);
}
SSDataBlock* pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator); SSDataBlock* pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator);
if (pBlock != NULL) { if (pBlock != NULL) {
pOperator->resultInfo.totalRows += pBlock->info.rows; pOperator->resultInfo.totalRows += pBlock->info.rows;
} else { } else {
stopGroupTableMergeScan(pOperator);
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
return pBlock; return pBlock;
...@@ -2285,17 +2334,11 @@ void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -2285,17 +2334,11 @@ void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) {
STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param; STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
cleanupQueryTableDataCond(&pTableScanInfo->cond); cleanupQueryTableDataCond(&pTableScanInfo->cond);
for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->dataReaders); ++i) {
tsdbReaderT* reader = taosArrayGetP(pTableScanInfo->dataReaders, i);
tsdbCleanupReadHandle(reader);
}
taosArrayDestroy(pTableScanInfo->dataReaders);
if (pTableScanInfo->pColMatchInfo != NULL) { if (pTableScanInfo->pColMatchInfo != NULL) {
taosArrayDestroy(pTableScanInfo->pColMatchInfo); taosArrayDestroy(pTableScanInfo->pColMatchInfo);
} }
taosArrayDestroy(pTableScanInfo->sortSourceParams);
pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock); pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock); pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);
...@@ -2321,8 +2364,9 @@ int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExpla ...@@ -2321,8 +2364,9 @@ int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExpla
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SArray* dataReaders, SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo* pTableListInfo,
SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) { SReadHandle* readHandle, SExecTaskInfo* pTaskInfo, uint64_t queryId,
uint64_t taskId) {
STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo)); STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
...@@ -2352,22 +2396,16 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN ...@@ -2352,22 +2396,16 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo->sample.seed = taosGetTimestampSec(); pInfo->sample.seed = taosGetTimestampSec();
pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired; pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
pInfo->pFilterNode = pTableScanNode->scan.node.pConditions; pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
pInfo->dataReaders = dataReaders; pInfo->tableListInfo = pTableListInfo;
pInfo->scanFlag = MAIN_SCAN; pInfo->scanFlag = MAIN_SCAN;
pInfo->pColMatchInfo = pColList; pInfo->pColMatchInfo = pColList;
pInfo->curTWinIdx = 0;
pInfo->pResBlock = createResDataBlock(pDescNode); pInfo->pResBlock = createResDataBlock(pDescNode);
pInfo->dataReaders = taosArrayInit(64, POINTER_BYTES);
pInfo->queryId = queryId;
pInfo->taskId = taskId;
pInfo->sortSourceParams = taosArrayInit(taosArrayGetSize(dataReaders), sizeof(STableMergeScanSortSourceParam)); pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
for (int32_t i = 0; i < taosArrayGetSize(dataReaders); ++i) {
STableMergeScanSortSourceParam* param = taosMemoryCalloc(1, sizeof(STableMergeScanSortSourceParam));
param->readerIdx = i;
param->pOperator = pOperator;
param->inputBlock = createOneDataBlock(pInfo->pResBlock, false);
taosArrayPush(pInfo->sortSourceParams, param);
taosMemoryFree(param);
}
pInfo->pSortInfo = generateSortByTsInfo(pInfo->cond.order); pInfo->pSortInfo = generateSortByTsInfo(pInfo->cond.order);
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false); pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
...@@ -2375,14 +2413,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN ...@@ -2375,14 +2413,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
int32_t rowSize = pInfo->pResBlock->info.rowSize; int32_t rowSize = pInfo->pResBlock->info.rowSize;
pInfo->bufPageSize = getProperSortPageSize(rowSize); pInfo->bufPageSize = getProperSortPageSize(rowSize);
// todo the total available buffer should be determined by total capacity of buffer of this task.
// the additional one is reserved for merge result
pInfo->sortBufSize = pInfo->bufPageSize * (taosArrayGetSize(dataReaders) + 1);
pInfo->hasGroupId = false;
pInfo->prefetchedTuple = NULL;
pOperator->name = "TableMergeScanOperator"; pOperator->name = "TableMergeScanOperator";
// TODO : change it
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN;
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
...@@ -2392,7 +2423,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN ...@@ -2392,7 +2423,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
initResultSizeInfo(pOperator, 1024); initResultSizeInfo(pOperator, 1024);
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(doOpenTableMergeScanOperator, doTableMergeScan, NULL, NULL, destroyTableMergeScanOperatorInfo, createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, NULL, destroyTableMergeScanOperatorInfo,
NULL, NULL, getTableMergeScanExplainExecInfo); NULL, NULL, getTableMergeScanExplainExecInfo);
pOperator->cost.openCost = 0; pOperator->cost.openCost = 0;
return pOperator; return pOperator;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册