diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 84d503b972bde790ad3a65cdcf3e592a8e970071..1bf6ca8dab7d6a4f8647bf4c553d4da8e035df89 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -317,12 +317,16 @@ typedef struct STagScanInfo { typedef struct SLastrowScanInfo { SSDataBlock *pRes; - SArray *pTableList; SReadHandle readHandle; void *pLastrowReader; SArray *pColMatchInfo; int32_t *pSlotIds; SExprSupp pseudoExprSup; + int32_t retrieveType; + int32_t currentGroupIndex; + SSDataBlock *pBufferredRes; + SArray *pUidList; + int32_t indexOfBufferedRes; } SLastrowScanInfo; typedef enum EStreamScanMode { @@ -826,8 +830,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams, SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, - SArray* pTableList, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo); SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, @@ -945,8 +948,9 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); -int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, +int32_t createScanTableListInfo(SScanPhysiNode *pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle, STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId); + SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo *pTableListInfo, diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 9034397d0f5f6e1e0cb632ee67cc4d4698b39e2f..78a7e58ee111cb2c22ea8b2736730936c5dca8ce 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -30,15 +30,13 @@ static SSDataBlock* doScanLastrow(SOperatorInfo* pOperator); static void destroyLastrowScanOperator(void* param, int32_t numOfOutput); static int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds); -SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, SArray* pTableList, - SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) { SLastrowScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SLastrowScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } - pInfo->pTableList = pTableList; pInfo->readHandle = *readHandle; pInfo->pRes = createResDataBlock(pScanNode->scan.node.pOutputDataBlockDesc); @@ -50,8 +48,22 @@ SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SRead goto _error; } - tsdbLastRowReaderOpen(readHandle->vnode, LASTROW_RETRIEVE_TYPE_SINGLE, pTableList, taosArrayGetSize(pInfo->pColMatchInfo), - &pInfo->pLastrowReader); + STableListInfo* pTableList = &pTaskInfo->tableqinfoList; + + initResultSizeInfo(pOperator, 1024); + blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); + pInfo->pUidList = taosArrayInit(4, sizeof(int64_t)); + + // partition by tbname + if (taosArrayGetSize(pTableList->pGroupList) == taosArrayGetSize(pTableList->pTableList)) { + pInfo->retrieveType = LASTROW_RETRIEVE_TYPE_ALL; + tsdbLastRowReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pTableList->pTableList, + taosArrayGetSize(pInfo->pColMatchInfo), &pInfo->pLastrowReader); + pInfo->pBufferredRes = createOneDataBlock(pInfo->pRes, false); + blockDataEnsureCapacity(pInfo->pBufferredRes, pOperator->resultInfo.capacity); + } else { // by tags + pInfo->retrieveType = LASTROW_RETRIEVE_TYPE_SINGLE; + } if (pScanNode->scan.pScanPseudoCols != NULL) { SExprSupp* pPseudoExpr = &pInfo->pseudoExprSup; @@ -60,19 +72,17 @@ SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SRead pPseudoExpr->pCtx = createSqlFunctionCtx(pPseudoExpr->pExprInfo, pPseudoExpr->numOfExprs, &pPseudoExpr->rowEntryInfoOffset); } - pOperator->name = "LastrowScanOperator"; + pOperator->name = "LastrowScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); - initResultSizeInfo(pOperator, 1024); - blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doScanLastrow, NULL, NULL, destroyLastrowScanOperator, NULL, NULL, NULL); + pOperator->cost.openCost = 0; return pOperator; @@ -90,43 +100,105 @@ SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) { SLastrowScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - - int32_t size = taosArrayGetSize(pInfo->pTableList); + STableListInfo* pTableList = &pTaskInfo->tableqinfoList; + int32_t size = taosArrayGetSize(pTableList->pTableList); if (size == 0) { - setTaskStatus(pTaskInfo, TASK_COMPLETED); + doSetOperatorCompleted(pOperator); return NULL; } + blockDataCleanup(pInfo->pRes); + // check if it is a group by tbname - if (size == taosArrayGetSize(pInfo->pTableList)) { - blockDataCleanup(pInfo->pRes); - SArray* pUidList = taosArrayInit(1, sizeof(tb_uid_t)); - int32_t code = tsdbRetrieveLastRow(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pUidList); - if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, code); + if (pInfo->retrieveType == LASTROW_RETRIEVE_TYPE_ALL) { + if (pInfo->indexOfBufferedRes >= pInfo->pBufferredRes->info.rows) { + blockDataCleanup(pInfo->pBufferredRes); + taosArrayClear(pInfo->pUidList); + + int32_t code = tsdbRetrieveLastRow(pInfo->pLastrowReader, pInfo->pBufferredRes, pInfo->pSlotIds, pInfo->pUidList); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, code); + } + + // check for tag values + int32_t resultRows = pInfo->pBufferredRes->info.rows; + ASSERT(resultRows == taosArrayGetSize(pInfo->pUidList)); + pInfo->indexOfBufferedRes = 0; + } + + if (pInfo->indexOfBufferedRes < pInfo->pBufferredRes->info.rows) { + for(int32_t i = 0; i < taosArrayGetSize(pInfo->pColMatchInfo); ++i) { + SColMatchInfo* pMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i); + int32_t slotId = pMatchInfo->targetSlotId; + + SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferredRes->pDataBlock, slotId); + SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, slotId); + + char* p = colDataGetData(pSrc, pInfo->indexOfBufferedRes); + bool isNull = colDataIsNull_s(pSrc, pInfo->indexOfBufferedRes); + colDataAppend(pDst, 0, p, isNull); + } + + if (pInfo->pseudoExprSup.numOfExprs > 0) { + SExprSupp* pSup = &pInfo->pseudoExprSup; + addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, + GET_TASKID(pTaskInfo)); + } + + pInfo->pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, pInfo->indexOfBufferedRes); + int64_t* groupId = taosHashGet(pTableList->map, &pInfo->pRes->info.uid, sizeof(int64_t)); + pInfo->pRes->info.groupId = *groupId; + + pInfo->indexOfBufferedRes += 1; + pInfo->pRes->info.rows = 1; + return pInfo->pRes; + } else { + doSetOperatorCompleted(pOperator); + return NULL; } + } else { + size_t totalGroups = taosArrayGetSize(pTableList->pGroupList); + + while (pInfo->currentGroupIndex < totalGroups) { + SArray* pGroupTableList = taosArrayGetP(pTableList->pGroupList, pInfo->currentGroupIndex); + + tsdbLastRowReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pGroupTableList, + taosArrayGetSize(pInfo->pColMatchInfo), &pInfo->pLastrowReader); + taosArrayClear(pInfo->pUidList); + + int32_t code = tsdbRetrieveLastRow(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, code); + } - // check for tag values - if (pInfo->pRes->info.rows > 0 && pInfo->pseudoExprSup.numOfExprs > 0) { - SExprSupp* pSup = &pInfo->pseudoExprSup; - pInfo->pRes->info.uid = *(tb_uid_t*) taosArrayGet(pUidList, 0); - addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, GET_TASKID(pTaskInfo)); + pInfo->currentGroupIndex += 1; + + // check for tag values + if (pInfo->pRes->info.rows > 0) { + if (pInfo->pseudoExprSup.numOfExprs > 0) { + SExprSupp* pSup = &pInfo->pseudoExprSup; + pInfo->pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0); + + STableKeyInfo* pKeyInfo = taosArrayGet(pGroupTableList, 0); + pInfo->pRes->info.groupId = pKeyInfo->groupId; + + addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, + GET_TASKID(pTaskInfo)); + } + + tsdbLastrowReaderClose(pInfo->pLastrowReader); + return pInfo->pRes; + } } doSetOperatorCompleted(pOperator); - return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes; - } else { - // todo fetch the result for each group + return NULL; } - - return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes; } void destroyLastrowScanOperator(void* param, int32_t numOfOutput) { SLastrowScanInfo* pInfo = (SLastrowScanInfo*)param; blockDataDestroy(pInfo->pRes); - tsdbLastrowReaderClose(pInfo->pLastrowReader); - taosMemoryFreeClear(param); } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 89542571ea2cb02ab5ebb1802e87ecd7bfd51825..cecd80123b5346b154a22162e5622e4bdd094476 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -516,6 +516,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt // NOTE: the last parameter is the primary timestamp column if (fmIsTimelineFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) { pInput->pPTS = pInput->pData[j]; + ASSERT(pInput->pPTS->info.type == TSDB_DATA_TYPE_TIMESTAMP); } ASSERT(pInput->pData[j] != NULL); } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) { @@ -4291,6 +4292,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, } } } + int32_t len = (int32_t)(pStart - (char*)keyBuf); uint64_t groupId = calcGroupId(keyBuf, len); taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &groupId, sizeof(uint64_t)); @@ -4309,6 +4311,30 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, return TDB_CODE_SUCCESS; } +static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pCond) { + memset(pCond, 0, sizeof(SQueryTableDataCond)); + + pCond->order = TSDB_ORDER_ASC; + pCond->numOfCols = 1; + pCond->colList = taosMemoryCalloc(1, sizeof(SColumnInfo)); + if (pCond->colList == NULL) { + terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + return terrno; + } + + pCond->colList->colId = 1; + pCond->colList->type = TSDB_DATA_TYPE_TIMESTAMP; + pCond->colList->bytes = sizeof(TSKEY); + + pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX}; + pCond->suid = uid; + pCond->type = BLOCK_LOAD_OFFSET_ORDER; + pCond->startVersion = -1; + pCond->endVersion = -1; + + return TSDB_CODE_SUCCESS; +} + SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableListInfo* pTableListInfo, const char* pUser) { @@ -4318,7 +4344,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) { STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; - int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId); + int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, + pTableScanNode->groupSort, pHandle, pTableListInfo, queryId, taskId); if (code) { pTaskInfo->code = code; return NULL; @@ -4337,7 +4364,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) { STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode; - int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId); + int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, + pTableScanNode->groupSort, pHandle, pTableListInfo, queryId, taskId); if (code) { pTaskInfo->code = code; return NULL; @@ -4366,7 +4394,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo .maxTs = INT64_MIN, }; if (pHandle) { - int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId); + int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, + pTableScanNode->groupSort, pHandle, pTableListInfo, queryId, taskId); if (code) { pTaskInfo->code = code; return NULL; @@ -4406,25 +4435,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } SQueryTableDataCond cond = {0}; - - { - cond.order = TSDB_ORDER_ASC; - cond.numOfCols = 1; - cond.colList = taosMemoryCalloc(1, sizeof(SColumnInfo)); - if (cond.colList == NULL) { - terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; - return NULL; - } - - cond.colList->colId = 1; - cond.colList->type = TSDB_DATA_TYPE_TIMESTAMP; - cond.colList->bytes = sizeof(TSKEY); - - cond.twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX}; - cond.suid = pBlockNode->suid; - cond.type = BLOCK_LOAD_OFFSET_ORDER; - cond.startVersion = -1; - cond.endVersion = -1; + int32_t code = initTableblockDistQueryCond(pBlockNode->suid, &cond); + if (code != TSDB_CODE_SUCCESS) { + return NULL; } STsdbReader* pReader = NULL; @@ -4435,31 +4448,20 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) { SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode; - // int32_t code = createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId); - // if (code) { - // pTaskInfo->code = code; - // return NULL; - // } - - int32_t code = extractTableSchemaInfo(pHandle, pScanNode->scan.uid, pTaskInfo); + int32_t code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo, + queryId, taskId); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; return NULL; } - pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo)); - if (pScanNode->scan.tableType == TSDB_SUPER_TABLE) { - code = vnodeGetAllTableList(pHandle->vnode, pScanNode->scan.uid, pTableListInfo->pTableList); - if (code != TSDB_CODE_SUCCESS) { - pTaskInfo->code = terrno; - return NULL; - } - } else { // Create one table group. - STableKeyInfo info = {.lastKey = 0, .uid = pScanNode->scan.uid, .groupId = 0}; - taosArrayPush(pTableListInfo->pTableList, &info); + code = extractTableSchemaInfo(pHandle, pScanNode->scan.uid, pTaskInfo); + if (code != TSDB_CODE_SUCCESS) { + pTaskInfo->code = code; + return NULL; } - return createLastrowScanOperator(pScanNode, pHandle, pTableListInfo->pTableList, pTaskInfo); + return createLastrowScanOperator(pScanNode, pHandle, pTaskInfo); } else { ASSERT(0); } @@ -4928,6 +4930,9 @@ static void doDestroyTableList(STableListInfo* pTableqinfoList) { if (pTableqinfoList->needSortTableByGroupId) { for (int32_t i = 0; i < taosArrayGetSize(pTableqinfoList->pGroupList); i++) { SArray* tmp = taosArrayGetP(pTableqinfoList->pGroupList, i); + if (tmp == pTableqinfoList->pTableList) { + continue; + } taosArrayDestroy(tmp); } } diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 7b784e9f32a2cdcd78e7fe32609aded1b786c86e..f6352f421a8590a9e80e172f80ecaf87836b4ca3 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -2222,7 +2222,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "_cache_last_row", .type = FUNCTION_TYPE_CACHE_LAST_ROW, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, .translateFunc = translateFirstLast, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup,