提交 a9f09461 编写于 作者: S slzhou

fix: restore table merge scan operator

上级 8171e326
...@@ -196,6 +196,7 @@ struct SVnodeCfg { ...@@ -196,6 +196,7 @@ struct SVnodeCfg {
typedef struct { typedef struct {
TSKEY lastKey; TSKEY lastKey;
uint64_t uid; uint64_t uid;
uint64_t groupId;
} STableKeyInfo; } STableKeyInfo;
struct SMetaEntry { struct SMetaEntry {
......
...@@ -2852,7 +2852,7 @@ int32_t tsdbGetAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) { ...@@ -2852,7 +2852,7 @@ int32_t tsdbGetAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) {
break; break;
} }
STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, uid = id}; STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, uid = id, .groupId = 0};
taosArrayPush(list, &info); taosArrayPush(list, &info);
} }
......
...@@ -315,7 +315,7 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo ...@@ -315,7 +315,7 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo
} }
for (int i = 0; i < taosArrayGetSize(res); i++) { for (int i = 0; i < taosArrayGetSize(res); i++) {
STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, .uid = *(uint64_t*)taosArrayGet(res, i)}; STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, .uid = *(uint64_t*)taosArrayGet(res, i), .groupId = 0};
taosArrayPush(pListInfo->pTableList, &info); taosArrayPush(pListInfo->pTableList, &info);
} }
taosArrayDestroy(res); taosArrayDestroy(res);
...@@ -336,7 +336,7 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo ...@@ -336,7 +336,7 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo
} }
} }
}else { // Create one table group. }else { // Create one table group.
STableKeyInfo info = {.lastKey = 0, .uid = tableUid}; STableKeyInfo info = {.lastKey = 0, .uid = tableUid, .groupId = 0};
taosArrayPush(pListInfo->pTableList, &info); taosArrayPush(pListInfo->pTableList, &info);
} }
pListInfo->pGroupList = taosArrayInit(4, POINTER_BYTES); pListInfo->pGroupList = taosArrayInit(4, POINTER_BYTES);
......
...@@ -4028,6 +4028,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, ...@@ -4028,6 +4028,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
int32_t len = (int32_t)(pStart - (char*)keyBuf); int32_t len = (int32_t)(pStart - (char*)keyBuf);
uint64_t groupId = calcGroupId(keyBuf, len); uint64_t groupId = calcGroupId(keyBuf, len);
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &groupId, sizeof(uint64_t)); taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &groupId, sizeof(uint64_t));
info->groupId = groupId;
groupNum++; groupNum++;
nodesClearList(groupNew); nodesClearList(groupNew);
...@@ -4127,7 +4128,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4127,7 +4128,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL; return NULL;
} }
} else { // Create one table group. } else { // Create one table group.
STableKeyInfo info = {.lastKey = 0, .uid = pBlockNode->uid}; STableKeyInfo info = {.lastKey = 0, .uid = pBlockNode->uid, .groupId = 0};
taosArrayPush(pTableListInfo->pTableList, &info); taosArrayPush(pTableListInfo->pTableList, &info);
} }
......
...@@ -1965,7 +1965,10 @@ _error: ...@@ -1965,7 +1965,10 @@ _error:
typedef struct STableMergeScanInfo { typedef struct STableMergeScanInfo {
STableListInfo* tableListInfo; STableListInfo* tableListInfo;
int32_t currentGroupId; int32_t tableStartIndex;
int32_t tableEndIndex;
bool hasGroupId;
uint64_t groupId;
SArray* dataReaders; // array of tsdbReaderT* SArray* dataReaders; // array of tsdbReaderT*
SReadHandle readHandle; SReadHandle readHandle;
...@@ -2006,7 +2009,7 @@ typedef struct STableMergeScanInfo { ...@@ -2006,7 +2009,7 @@ typedef struct STableMergeScanInfo {
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag; int32_t dataBlockLoadFlag;
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time
// window to check if current data block needs to be loaded. // window to check if current data block needs to be loaded.
SSampleExecInfo sample; // sample execution info SSampleExecInfo sample; // sample execution info
} STableMergeScanInfo; } STableMergeScanInfo;
...@@ -2030,6 +2033,22 @@ int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle ...@@ -2030,6 +2033,22 @@ int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle, STableListInfo* pTableListInfo,
int32_t tableStartIdx, int32_t tableEndIdx, SArray* arrayReader, uint64_t queryId,
uint64_t taskId) {
for (int32_t i = tableStartIdx; i <= tableEndIdx; ++i) {
SArray* subTableList = taosArrayInit(1, sizeof(STableKeyInfo));
taosArrayPush(subTableList, taosArrayGet(pTableListInfo->pTableList, i));
tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, pQueryCond, subTableList, queryId, taskId);
taosArrayPush(arrayReader, &pReader);
taosArrayDestroy(subTableList);
}
return TSDB_CODE_SUCCESS;
}
// todo refactor // todo refactor
static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo, static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo,
int32_t readerIdx, SSDataBlock* pBlock, uint32_t* status) { int32_t readerIdx, SSDataBlock* pBlock, uint32_t* status) {
...@@ -2216,34 +2235,32 @@ SArray* generateSortByTsInfo(int32_t order) { ...@@ -2216,34 +2235,32 @@ SArray* generateSortByTsInfo(int32_t order) {
return pList; return pList;
} }
static int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle, SArray* tableList, SArray* arrayReader, uint64_t queryId,
uint64_t taskId) {
for (int32_t i = 0; i < taosArrayGetSize(tableList); ++i) {
SArray* tmp = taosArrayInit(1, sizeof(STableKeyInfo));
taosArrayPush(tmp, taosArrayGet(tableList, i));
tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, pQueryCond, tmp, queryId, taskId);
taosArrayPush(arrayReader, &pReader);
taosArrayDestroy(tmp);
}
return TSDB_CODE_SUCCESS;
}
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
STableMergeScanInfo* pInfo = pOperator->info; STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SArray* tableList = taosArrayGetP(pInfo->tableListInfo->pGroupList, pInfo->currentGroupId); {
size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList);
int32_t i = pInfo->tableStartIndex + 1;
for (; i < tableListSize; ++i) {
STableKeyInfo* tableKeyInfo = taosArrayGet(pInfo->tableListInfo->pTableList, i);
if (tableKeyInfo->groupId != pInfo->groupId) {
break;
}
}
pInfo->tableEndIndex = i - 1;
}
int32_t tableStartIdx = pInfo->tableStartIndex;
int32_t tableEndIdx = pInfo->tableEndIndex;
createMultipleDataReaders(&pInfo->cond, &pInfo->readHandle, tableList, STableListInfo* tableListInfo = pInfo->tableListInfo;
createMultipleDataReaders(&pInfo->cond, &pInfo->readHandle, tableListInfo, tableStartIdx, tableEndIdx,
pInfo->dataReaders, pInfo->queryId, pInfo->taskId); pInfo->dataReaders, pInfo->queryId, pInfo->taskId);
// todo the total available buffer should be determined by total capacity of buffer of this task. // todo the total available buffer should be determined by total capacity of buffer of this task.
// the additional one is reserved for merge result // the additional one is reserved for merge result
int32_t tableLen = taosArrayGetSize(tableList); pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1);
pInfo->sortBufSize = pInfo->bufPageSize * ((tableLen==0?1:tableLen) + 1);
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage, pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->pSortInputBlock, pTaskInfo->id.str); pInfo->pSortInputBlock, pTaskInfo->id.str);
...@@ -2330,43 +2347,38 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { ...@@ -2330,43 +2347,38 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code); longjmp(pTaskInfo->env, code);
} }
size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList);
if (!pInfo->hasGroupId) {
pInfo->hasGroupId = true;
if (pInfo->currentGroupId == -1) { if (tableListSize == 0) {
pInfo->currentGroupId++;
if (pInfo->currentGroupId >= taosArrayGetSize(pInfo->tableListInfo->pGroupList)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
return NULL; return NULL;
} }
pInfo->tableStartIndex = 0;
pInfo->groupId = ((STableKeyInfo*)taosArrayGet(pInfo->tableListInfo->pTableList, pInfo->tableStartIndex))->groupId;
startGroupTableMergeScan(pOperator); startGroupTableMergeScan(pOperator);
} }
SSDataBlock* pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator); SSDataBlock* pBlock = NULL;
if (pBlock != NULL) { while (pInfo->tableStartIndex < tableListSize) {
uint64_t* groupId = taosHashGet(pInfo->tableListInfo->map, &(pBlock->info.uid), sizeof(uint64_t)); pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator);
if(groupId) pBlock->info.groupId = *groupId; if (pBlock != NULL) {
pBlock->info.groupId = pInfo->groupId;
pOperator->resultInfo.totalRows += pBlock->info.rows; pOperator->resultInfo.totalRows += pBlock->info.rows;
return pBlock; return pBlock;
} } else {
stopGroupTableMergeScan(pOperator);
stopGroupTableMergeScan(pOperator); if (pInfo->tableEndIndex >= tableListSize - 1) {
pInfo->currentGroupId++; doSetOperatorCompleted(pOperator);
if (pInfo->currentGroupId >= taosArrayGetSize(pInfo->tableListInfo->pGroupList)) { break;
doSetOperatorCompleted(pOperator); }
return NULL; pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
} pInfo->groupId =
startGroupTableMergeScan(pOperator); ((STableKeyInfo*)taosArrayGet(pInfo->tableListInfo->pTableList, pInfo->tableStartIndex))->groupId;
startGroupTableMergeScan(pOperator);
pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator); }
if (pBlock != NULL) {
uint64_t* groupId = taosHashGet(pInfo->tableListInfo->map, &(pBlock->info.uid), sizeof(uint64_t));
if(groupId) pBlock->info.groupId = *groupId;
pOperator->resultInfo.totalRows += pBlock->info.rows;
return pBlock;
} }
doSetOperatorCompleted(pOperator);
return pBlock; return pBlock;
} }
...@@ -2403,6 +2415,12 @@ int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExpla ...@@ -2403,6 +2415,12 @@ int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExpla
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t compareTableKeyInfoByGid(const void* p1, const void* p2) {
const STableKeyInfo* info1 = p1;
const STableKeyInfo* info2 = p2;
return info1->groupId - info2->groupId;
}
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo* pTableListInfo, SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo* pTableListInfo,
SReadHandle* readHandle, SExecTaskInfo* pTaskInfo, uint64_t queryId, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo, uint64_t queryId,
uint64_t taskId) { uint64_t taskId) {
...@@ -2411,6 +2429,9 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN ...@@ -2411,6 +2429,9 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
goto _error; goto _error;
} }
if (pTableScanNode->pPartitionTags) {
taosArraySort(pTableListInfo->pTableList, compareTableKeyInfoByGid);
}
SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc; SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;
...@@ -2443,7 +2464,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN ...@@ -2443,7 +2464,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo->dataReaders = taosArrayInit(64, POINTER_BYTES); pInfo->dataReaders = taosArrayInit(64, POINTER_BYTES);
pInfo->queryId = queryId; pInfo->queryId = queryId;
pInfo->taskId = taskId; pInfo->taskId = taskId;
pInfo->currentGroupId = -1;
pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam)); pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册