diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 70d89102e67dd933a6228402f647a212d953f63a..d5d6f7c58a0db3a39a72c29ce38721759333293b 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -195,6 +195,7 @@ struct SVnodeCfg { typedef struct { TSKEY lastKey; uint64_t uid; + uint64_t groupId; } STableKeyInfo; struct SMetaEntry { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 19d7c2b03d01cf05cd528cce7f84d8440f5161d0..ce39b35aaa59354a5aa066d7f9c1af4f15760758 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -2845,7 +2845,7 @@ int32_t tsdbGetAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) { break; } - STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, uid = id}; + STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, uid = id, .groupId = 0}; taosArrayPush(list, &info); } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 51281482219ae31fb0766ab45c0d35be68941789..654cf681e6957e1b4e39b709c59cadfc1f01aa42 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -838,14 +838,19 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); -int32_t createMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, +int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, + STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, + SNode* pTagCond); +int32_t doCreateMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STableListInfo* pTableListInfo, SArray* arrayReader, uint64_t queryId, - uint64_t taskId, SNode* pTagCond); + uint64_t taskId); SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SArray* dataReaders, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo); void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex); +int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SArray* groupKey); + #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 566abb14515cd02904183a14aa3b5f34cb6181a8..891ffec09bda1e6a1afe9bbcf6c7413bc7d9fc24 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -239,7 +239,7 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo } for (int i = 0; i < taosArrayGetSize(res); i++) { - STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, .uid = *(uint64_t*)taosArrayGet(res, i)}; + STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, .uid = *(uint64_t*)taosArrayGet(res, i), .groupId = 0}; taosArrayPush(pListInfo->pTableList, &info); } taosArrayDestroy(res); @@ -247,7 +247,7 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo code = tsdbGetAllTableList(metaHandle, tableUid, pListInfo->pTableList); } } else { // Create one table group. - STableKeyInfo info = {.lastKey = 0, .uid = tableUid}; + STableKeyInfo info = {.lastKey = 0, .uid = tableUid, .groupId = 0}; taosArrayPush(pListInfo->pTableList, &info); } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 097b8b9fb8223483abaee795fc3b5e77312cf2aa..828da0bcf89f5023b7f8427ba6bc888e15d5c3c2 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3968,14 +3968,16 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, } } } - int32_t len = (int32_t)(pStart - (char*)keyBuf); - uint64_t* groupId = taosHashGet(pTableListInfo->map, keyBuf, len); - if (groupId) { - taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), groupId, sizeof(uint64_t)); - } else { + + uint64_t* pGroupId = taosHashGet(pTableListInfo->map, keyBuf, len); + + if (!pGroupId) { uint64_t tmpId = calcGroupId(keyBuf, len); + info->groupId = tmpId; taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &tmpId, sizeof(uint64_t)); + } else { + info->groupId = *pGroupId; } metaReaderClear(&mr); @@ -4023,11 +4025,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode; SArray* dataReaders = taosArrayInit(8, POINTER_BYTES); - createMultipleDataReaders(pTableScanNode, pHandle, pTableListInfo, dataReaders, queryId, taskId, pTagCond); + createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId, pTagCond); + doCreateMultipleDataReaders(pTableScanNode, pHandle, pTableListInfo, dataReaders, queryId, taskId); + extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo); - SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionTags); - generateGroupIdMap(pTableListInfo, pHandle, groupKeys); // todo for json - taosArrayDestroy(groupKeys); + SOperatorInfo* pOperator = createTableMergeScanOperatorInfo(pTableScanNode, dataReaders, pHandle, pTaskInfo); STableScanInfo* pScanInfo = pOperator->info; pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; @@ -4097,7 +4099,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return NULL; } } else { // Create one table group. - STableKeyInfo info = {.lastKey = 0, .uid = pBlockNode->uid}; + STableKeyInfo info = {.lastKey = 0, .uid = pBlockNode->uid, .groupId = 0}; taosArrayPush(pTableListInfo->pTableList, &info); } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 39a62410f9fe02730dc3c38fb3ffb618398b6e59..62167ec9ffe97be617237a0c458eb7e4fc5e54ed 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -597,12 +597,12 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pInfo->dataReader = pReadHandle; // pInfo->prevGroupId = -1; - pOperator->name = "TableSeqScanOperator"; + pOperator->name = "TableSeqScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL, NULL, NULL, NULL); return pOperator; @@ -616,7 +616,7 @@ static int32_t doGetTableRowSize(void* pMeta, uint64_t uid) { metaGetTableEntryByUid(&mr, uid); if (mr.me.type == TSDB_SUPER_TABLE) { int32_t numOfCols = mr.me.stbEntry.schemaRow.nCols; - for(int32_t i = 0; i < numOfCols; ++i) { + for (int32_t i = 0; i < numOfCols; ++i) { rowLen += mr.me.stbEntry.schemaRow.pSchema[i].bytes; } } else if (mr.me.type == TSDB_CHILD_TABLE) { @@ -624,12 +624,12 @@ static int32_t doGetTableRowSize(void* pMeta, uint64_t uid) { metaGetTableEntryByUid(&mr, suid); int32_t numOfCols = mr.me.stbEntry.schemaRow.nCols; - for(int32_t i = 0; i < numOfCols; ++i) { + for (int32_t i = 0; i < numOfCols; ++i) { rowLen += mr.me.stbEntry.schemaRow.pSchema[i].bytes; } } else if (mr.me.type == TSDB_NORMAL_TABLE) { int32_t numOfCols = mr.me.ntbEntry.schemaRow.nCols; - for(int32_t i = 0; i < numOfCols; ++i) { + for (int32_t i = 0; i < numOfCols; ++i) { rowLen += mr.me.ntbEntry.schemaRow.pSchema[i].bytes; } } @@ -653,7 +653,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { SSDataBlock* pBlock = pBlockScanInfo->pResBlock; - int32_t slotId = pOperator->exprSupp.pExprInfo->base.resSchema.slotId; + int32_t slotId = pOperator->exprSupp.pExprInfo->base.resSchema.slotId; SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, slotId); int32_t len = tSerializeBlockDistInfo(NULL, 0, &blockDistInfo); @@ -685,23 +685,23 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* re goto _error; } - pInfo->pHandle = dataReader; + pInfo->pHandle = dataReader; pInfo->readHandle = *readHandle; - pInfo->uid = uid; - pInfo->pResBlock = createResDataBlock(pBlockScanNode->node.pOutputDataBlockDesc); + pInfo->uid = uid; + pInfo->pResBlock = createResDataBlock(pBlockScanNode->node.pOutputDataBlockDesc); - int32_t numOfCols = 0; + int32_t numOfCols = 0; SExprInfo* pExprInfo = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &numOfCols); - int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfCols); + int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfCols); if (code != TSDB_CODE_SUCCESS) { goto _error; } - pOperator->name = "DataBlockDistScanOperator"; + pOperator->name = "DataBlockDistScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, NULL, @@ -1878,26 +1878,25 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs); SArray* colList = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID); - int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs); if (code != TSDB_CODE_SUCCESS) { goto _error; } - pInfo->pTableList = pTableListInfo; - pInfo->pColMatchInfo = colList; - pInfo->pRes = createResDataBlock(pDescNode); - pInfo->readHandle = *pReadHandle; - pInfo->curPos = 0; - pInfo->pFilterNode = pPhyNode->node.pConditions; + pInfo->pTableList = pTableListInfo; + pInfo->pColMatchInfo = colList; + pInfo->pRes = createResDataBlock(pDescNode); + pInfo->readHandle = *pReadHandle; + pInfo->curPos = 0; + pInfo->pFilterNode = pPhyNode->node.pConditions; pOperator->name = "TagScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; initResultSizeInfo(pOperator, 4096); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); @@ -1963,25 +1962,42 @@ typedef struct STableMergeScanInfo { } STableMergeScanInfo; -int32_t createMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, - STableListInfo* pTableListInfo, SArray* arrayReader, uint64_t queryId, - uint64_t taskId, SNode* pTagCond) { +int32_t compareTableKeyInfoByGid(const void* p1, const void* p2) { + const STableKeyInfo* info1 = p1; + const STableKeyInfo* info2 = p2; + return info1->groupId - info2->groupId; +} + +int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, + STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, + SNode* pTagCond) { int32_t code = getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo, pTagCond); if (code != TSDB_CODE_SUCCESS) { - goto _error; + return code; } if (taosArrayGetSize(pTableListInfo->pTableList) == 0) { qDebug("no table qualified for query, TID:0x%" PRIx64 ", QID:0x%" PRIx64, taskId, queryId); - goto _error; + return TSDB_CODE_SUCCESS; + } + SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionTags); + generateGroupIdMap(pTableListInfo, pHandle, groupKeys); // todo for json + if (groupKeys) { + taosArraySort(pTableListInfo->pTableList, compareTableKeyInfoByGid); } + taosArrayDestroy(groupKeys); + return TSDB_CODE_SUCCESS; +} + +int32_t doCreateMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, + STableListInfo* pTableListInfo, SArray* arrayReader, uint64_t queryId, + uint64_t taskId) { SQueryTableDataCond cond = {0}; - code = initQueryTableDataCond(&cond, pTableScanNode); + int32_t code = initQueryTableDataCond(&cond, pTableScanNode); if (code != TSDB_CODE_SUCCESS) { goto _error; } - // TODO: free the sublist info and the table list in it for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); ++i) { STableListInfo* subListInfo = taosMemoryCalloc(1, sizeof(subListInfo)); subListInfo->pTableList = taosArrayInit(1, sizeof(STableKeyInfo)); @@ -1995,7 +2011,7 @@ int32_t createMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHand } cleanupQueryTableDataCond(&cond); - return 0; + return TSDB_CODE_SUCCESS; _error: return code;