From 6c332af1aed075d544e3b224d9f7a65b85bc72a7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 28 Mar 2022 20:11:02 +0800 Subject: [PATCH] [td-13039] support sort. --- source/common/test/commonTests.cpp | 4 +- source/libs/executor/inc/executorimpl.h | 3 +- source/libs/executor/src/executorimpl.c | 82 +++++++++++++++---------- source/libs/executor/test/sortTests.cpp | 4 +- 4 files changed, 54 insertions(+), 39 deletions(-) diff --git a/source/common/test/commonTests.cpp b/source/common/test/commonTests.cpp index ccd800d3f4..d60261dfca 100644 --- a/source/common/test/commonTests.cpp +++ b/source/common/test/commonTests.cpp @@ -167,10 +167,10 @@ TEST(testCase, Datablock_test) { printf("the second row of binary:%s, length:%d\n", (char*)varDataVal(pData), varDataLen(pData)); SArray* pOrderInfo = taosArrayInit(3, sizeof(SBlockOrderInfo)); - SBlockOrderInfo order = {.order = TSDB_ORDER_ASC, .colIndex = 0}; + SBlockOrderInfo order = {.nullFirst = true, .order = TSDB_ORDER_ASC, .slotId = 0}; taosArrayPush(pOrderInfo, &order); - blockDataSort(b, pOrderInfo, true); + blockDataSort(b, pOrderInfo); blockDataDestroy(b); taosArrayDestroy(pOrderInfo); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 957678e953..8ef02201d4 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -577,6 +577,7 @@ typedef struct SSessionAggOperatorInfo { int32_t numOfRows; // number of rows int32_t start; // start row index bool reptScan; // next round scan + int64_t gap; // session window gap } SSessionAggOperatorInfo; typedef struct SStateWindowOperatorInfo { @@ -660,7 +661,7 @@ SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, SLimit* pLimit SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo); SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock, diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 266f910eb5..474a357d2b 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -336,6 +336,9 @@ SSDataBlock* createOutputBuf_rv1(SDataBlockDescNode* pNode) { SColumnInfoData idata = {{0}}; SSlotDescNode* pDescNode = nodesListGetNode(pNode->pSlots, i); idata.info.type = pDescNode->dataType.type; + if (pDescNode->dataType.bytes > 1000) { + pDescNode->dataType.bytes = 4; + } idata.info.bytes = pDescNode->dataType.bytes; idata.info.scale = pDescNode->dataType.scale; idata.info.slotId = pDescNode->slotId; @@ -1794,18 +1797,17 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock *pBlock) { } static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo *pInfo, SSDataBlock *pSDataBlock) { - STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; - STableQueryInfo* item = pRuntimeEnv->current; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; // primary timestamp column SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, 0); - bool masterScan = IS_MAIN_SCAN(pRuntimeEnv); +// bool masterScan = IS_MAIN_SCAN(pRuntimeEnv); SOptrBasicInfo* pBInfo = &pInfo->binfo; - int64_t gap = pOperator->pRuntimeEnv->pQueryAttr->sw.gap; + int64_t gap = pInfo->gap; pInfo->numOfRows = 0; - if (IS_REPEAT_SCAN(pRuntimeEnv) && !pInfo->reptScan) { + if (/*IS_REPEAT_SCAN(pRuntimeEnv) && */!pInfo->reptScan) { pInfo->reptScan = true; pInfo->prevTs = INT64_MIN; } @@ -1829,16 +1831,15 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator } else { // start a new session window SResultRow* pResult = NULL; + int64_t gid = pSDataBlock->info.groupId; pInfo->curWindow.ekey = pInfo->curWindow.skey; - int32_t ret = setResultOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.uid, &pInfo->curWindow, masterScan, - &pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput, - pBInfo->rowCellInfoOffset); - if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code - longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); - } +// int32_t ret = setResultOutputBufByKey_rv(pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult, gid, pInfo->binfo.pCtx, +// numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); +// if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code +// longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); +// } -// doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList, -// pSDataBlock->info.rows, pOperator->numOfOutput); +// doApplyFunctions(pInfo->binfo.pCtx, &w, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); pInfo->curWindow.skey = tsList[j]; pInfo->curWindow.ekey = tsList[j]; @@ -1851,15 +1852,14 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator SResultRow* pResult = NULL; pInfo->curWindow.ekey = pInfo->curWindow.skey; - int32_t ret = setResultOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.uid, &pInfo->curWindow, masterScan, - &pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput, - pBInfo->rowCellInfoOffset); - if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code - longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); - } +// int32_t ret = setResultOutputBufByKey_rv(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.uid, &pInfo->curWindow, masterScan, +// &pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput, +// pBInfo->rowCellInfoOffset); +// if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code +// longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); +// } -// doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList, -// pSDataBlock->info.rows, pOperator->numOfOutput); +// doApplyFunctions(pInfo->binfo.pCtx, &w, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); } static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) { @@ -7719,25 +7719,27 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper return pOperator; } -SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo) { SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } - int32_t code = doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, numOfCols, pTaskInfo->id.str); + int32_t numOfRows = 4096; + int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); + pInfo->gap = gap; pInfo->binfo.pRes = pResBlock; pInfo->prevTs = INT64_MIN; pInfo->reptScan = false; pOperator->name = "SessionWindowAggOperator"; -// pOperator->operatorType = OP_SessionWindow; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW; pOperator->blockingOptr = true; pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExprInfo; @@ -8577,18 +8579,30 @@ static SArray* createSortInfo(SNodeList* pNodeList); size_t size = LIST_LENGTH(pPhyNode->pChildren); assert(size == 1); - for (int32_t i = 0; i < size; ++i) { - SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); - SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); + SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); - SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode; + SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode; - int32_t num = 0; - SExprInfo *pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &num); - SSDataBlock *pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); - SArray *info = createSortInfo(pSortPhyNode->pSortKeys); - return createSortOperatorInfo(op, pExprInfo, num, pResBlock, info, pTaskInfo); - } + int32_t num = 0; + SExprInfo* pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &num); + SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); + SArray* info = createSortInfo(pSortPhyNode->pSortKeys); + + return createSortOperatorInfo(op, pExprInfo, num, pResBlock, info, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == nodeType(pPhyNode)) { + size_t size = LIST_LENGTH(pPhyNode->pChildren); + assert(size == 1); + + SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); + SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + + SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; + + int32_t num = 0; + SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num); + SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); + return createSessionAggOperatorInfo(op, pExprInfo, num, pResBlock, pSessionNode->gap, pTaskInfo); } else { ASSERT(0); }/*else if (pPhyNode->info.type == OP_MultiTableAggregate) { diff --git a/source/libs/executor/test/sortTests.cpp b/source/libs/executor/test/sortTests.cpp index 586aed7a67..ecea24135f 100644 --- a/source/libs/executor/test/sortTests.cpp +++ b/source/libs/executor/test/sortTests.cpp @@ -98,14 +98,14 @@ int32_t docomp(const void* p1, const void* p2, void* param) { for(int32_t i = 0; i < pInfo->size; ++i) { SBlockOrderInfo* pOrder = (SBlockOrderInfo*)TARRAY_GET_ELEM(pInfo, i); - SColumnInfoData* pLeftColInfoData = (SColumnInfoData*)TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->colIndex); + SColumnInfoData* pLeftColInfoData = (SColumnInfoData*)TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId); bool leftNull = false; if (pLeftColInfoData->hasNull) { leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg); } - SColumnInfoData* pRightColInfoData = (SColumnInfoData*) TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->colIndex); + SColumnInfoData* pRightColInfoData = (SColumnInfoData*) TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId); bool rightNull = false; if (pRightColInfoData->hasNull) { rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg); -- GitLab