diff --git a/src/client/src/tscGlobalmerge.c b/src/client/src/tscGlobalmerge.c index 3c769f46d8e170f3c02b4d4f07a624d596ed132c..a22269f98e270ea681f5e6bc25a3865b21b8988e 100644 --- a/src/client/src/tscGlobalmerge.c +++ b/src/client/src/tscGlobalmerge.c @@ -650,49 +650,49 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD } for(int32_t i = 0; i < pBlock->info.rows; ++i) { - if (pInfo->hasPrev) { - if (needToMerge(pBlock, pInfo->orderColumnList, i, pInfo->prevRow)) { - doMergeResultImpl(pOperator, pCtx, numOfExpr, i, addrPtr); - } else { - doFinalizeResultImpl(pInfo, pCtx, numOfExpr); + if (!pInfo->hasPrev) { + doMergeResultImpl(pOperator, pCtx, numOfExpr, i, addrPtr); + savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, i, &pInfo->hasPrev); + continue; + } - int32_t numOfRows = getNumOfResult(pOperator->pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput); - setTagValueForMultipleRows(pCtx, pOperator->numOfOutput, numOfRows); + if (needToMerge(pBlock, pInfo->orderColumnList, i, pInfo->prevRow)) { + doMergeResultImpl(pOperator, pCtx, numOfExpr, i, addrPtr); + continue; + } - pInfo->binfo.pRes->info.rows += numOfRows; + doFinalizeResultImpl(pInfo, pCtx, numOfExpr); - for(int32_t j = 0; j < numOfExpr; ++j) { - pCtx[j].pOutput += (pCtx[j].outputBytes * numOfRows); - if (pCtx[j].functionId == TSDB_FUNC_TOP || pCtx[j].functionId == TSDB_FUNC_BOTTOM || - pCtx[j].functionId == TSDB_FUNC_SAMPLE || pCtx[j].functionId == TSDB_FUNC_UNIQUE || - pCtx[j].functionId == TSDB_FUNC_TAIL) { - if(j > 0) pCtx[j].ptsOutputBuf = pCtx[j - 1].pOutput; - } - } + int32_t numOfRows = getNumOfResult(pOperator->pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput); + setTagValueForMultipleRows(pCtx, pOperator->numOfOutput, numOfRows); - for(int32_t j = 0; j < numOfExpr; ++j) { - if (pCtx[j].functionId < 0) { - continue; - } - { - assert(!TSDB_FUNC_IS_SCALAR(pCtx[j].functionId)); - aAggs[pCtx[j].functionId].init(&pCtx[j], pCtx[j].resultInfo); - } - } + pInfo->binfo.pRes->info.rows += numOfRows; - doMergeResultImpl(pOperator, pCtx, numOfExpr, i, addrPtr); + for(int32_t j = 0; j < numOfExpr; ++j) { + pCtx[j].pOutput += (pCtx[j].outputBytes * numOfRows); + if (pCtx[j].functionId == TSDB_FUNC_TOP || pCtx[j].functionId == TSDB_FUNC_BOTTOM || + pCtx[j].functionId == TSDB_FUNC_SAMPLE || pCtx[j].functionId == TSDB_FUNC_UNIQUE || + pCtx[j].functionId == TSDB_FUNC_TAIL) { + if(j > 0) pCtx[j].ptsOutputBuf = pCtx[j - 1].pOutput; + } + } + + for(int32_t j = 0; j < numOfExpr; ++j) { + if (pCtx[j].functionId < 0) { + continue; + } + { + assert(!TSDB_FUNC_IS_SCALAR(pCtx[j].functionId)); + aAggs[pCtx[j].functionId].init(&pCtx[j], pCtx[j].resultInfo); } - } else { - doMergeResultImpl(pOperator, pCtx, numOfExpr, i, addrPtr); } + doMergeResultImpl(pOperator, pCtx, numOfExpr, i, addrPtr); savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, i, &pInfo->hasPrev); } - { - for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) { - pCtx[i].pInput = addrPtr[i]; - } + for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) { + pCtx[i].pInput = addrPtr[i]; } tfree(addrPtr); @@ -900,6 +900,7 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { SMultiwayMergeInfo *pAggInfo = pOperator->info; SOperatorInfo *upstream = pOperator->upstream[0]; + SQueryAttr *pQueryAttr = pOperator->pRuntimeEnv->pQueryAttr; *newgroup = false; bool handleData = false; @@ -910,7 +911,7 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { pAggInfo->hasPrev = false; // now we start from a new group data set. // not belongs to the same group, return the result of current group; - setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pAggInfo->pExistBlock, TSDB_ORDER_ASC); + setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pAggInfo->pExistBlock, pQueryAttr->order.order); updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pAggInfo->pExistBlock->info.rows, pOperator->pRuntimeEnv, true); { // reset output buffer @@ -956,13 +957,13 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { *newgroup = true; pAggInfo->hasDataBlockForNewGroup = true; pAggInfo->pExistBlock = pBlock; - savePrevOrderColumns(pAggInfo->prevRow, pAggInfo->groupColumnList, pBlock, 0, &pAggInfo->hasPrev); + savePrevOrderColumns(pAggInfo->currentGroupColData, pAggInfo->groupColumnList, pBlock, 0, &pAggInfo->hasGroupColData); break; } } // not belongs to the same group, return the result of current group - setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC); + setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pBlock, pQueryAttr->order.order); updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor, pOperator->pRuntimeEnv, true); doExecuteFinalMerge(pOperator, pOperator->numOfOutput, pBlock); diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index e7fce50b6aa17bf26cf263b52b9ab9bb827ca352..cd4d53941e2000cfec0e330fdb3f826a555ba0b1 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -2643,8 +2643,8 @@ static void updateLastScanOrderIfNeeded(SQueryInfo* pQueryInfo) { } pExpr->base.numOfParams = 1; - pExpr->base.param->i64 = TSDB_ORDER_ASC; - pExpr->base.param->nType = TSDB_DATA_TYPE_INT; + pExpr->base.param[0].i64 = TSDB_ORDER_ASC; + pExpr->base.param[0].nType = TSDB_DATA_TYPE_INT; } } } @@ -6713,8 +6713,6 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq const char* msg5 = "only primary timestamp/column in top/bottom function allowed as order column"; const char* msg6 = "only primary timestamp allowed as the second order column"; const char* msg7 = "only primary timestamp/column in groupby clause allowed as order column"; - const char* msg8 = "only column in groupby clause allowed as order column"; - const char* msg9 = "orderby column must projected in subquery"; const char* msg10 = "not support distinct mixed with order by"; const char* msg11 = "not support order with udf"; const char* msg12 = "order by tags not supported with diff/derivative/csum/mavg"; @@ -6847,87 +6845,49 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq return invalidOperationMsg(pMsgBuf, msg3); } - size_t s = taosArrayGetSize(pSortOrder); - if (s == 1) { - if (orderByTags) { - if (tscIsDiffDerivLikeQuery(pQueryInfo)) { - return invalidOperationMsg(pMsgBuf, msg12); - } - //pQueryInfo->groupbyExpr.orderIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta); - - CommonItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0); - pQueryInfo->groupbyExpr.orderType = p1->sortOrder; - } else if (orderByGroupbyCol) { - CommonItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0); - - pQueryInfo->groupbyExpr.orderType = p1->sortOrder; - pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId; - if (udf) { - return invalidOperationMsg(pMsgBuf, msg11); - } - } else if (isTopBottomUniqueQuery(pQueryInfo)) { - /* order of top/bottom query in interval is not valid */ + if (orderByTags) { + if (tscIsDiffDerivLikeQuery(pQueryInfo)) { + return invalidOperationMsg(pMsgBuf, msg12); + } + //pQueryInfo->groupbyExpr.orderIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta); - int32_t pos = tscExprTopBottomIndex(pQueryInfo); - assert(pos > 0); - SExprInfo* pExpr = tscExprGet(pQueryInfo, pos - 1); - assert(pExpr->base.functionId == TSDB_FUNC_TS); + pQueryInfo->groupbyExpr.orderType = pItem->sortOrder; + } else if (orderByGroupbyCol) { - pExpr = tscExprGet(pQueryInfo, pos); + pQueryInfo->groupbyExpr.orderType = pItem->sortOrder; + if (udf) { + return invalidOperationMsg(pMsgBuf, msg11); + } + } else if (isTopBottomUniqueQuery(pQueryInfo)) { + /* order of top/bottom query in interval is not valid */ - if (pExpr->base.colInfo.colIndex != index.columnIndex && index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) { - return invalidOperationMsg(pMsgBuf, msg5); - } + int32_t pos = tscExprTopBottomIndex(pQueryInfo); + assert(pos > 0); + SExprInfo* pExpr = tscExprGet(pQueryInfo, pos - 1); + assert(pExpr->base.functionId == TSDB_FUNC_TS); - CommonItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0); - pQueryInfo->order.order = p1->sortOrder; - pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId; - return TSDB_CODE_SUCCESS; - } else { - CommonItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0); + pExpr = tscExprGet(pQueryInfo, pos); - if (udf) { - return invalidOperationMsg(pMsgBuf, msg11); - } - - pQueryInfo->order.order = p1->sortOrder; - pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX; - - // orderby ts query on super table - if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { - bool found = false; - for (int32_t i = 0; i < tscNumOfExprs(pQueryInfo); ++i) { - SExprInfo* pExpr = tscExprGet(pQueryInfo, i); - if (pExpr->base.functionId == TSDB_FUNC_PRJ && pExpr->base.colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { - found = true; - break; - } - } - if (!found && pQueryInfo->pDownstream) { - return invalidOperationMsg(pMsgBuf, msg9); - } - addPrimaryTsColIntoResult(pQueryInfo, pCmd); - } + if (pExpr->base.colInfo.colIndex != index.columnIndex && index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) { + return invalidOperationMsg(pMsgBuf, msg5); } + + pQueryInfo->order.order = pItem->sortOrder; + pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId; + return TSDB_CODE_SUCCESS; } else { - pItem = taosArrayGet(pSqlNode->pSortOrder, 0); - if (orderByTags) { - //pQueryInfo->groupbyExpr.orderIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta); - pQueryInfo->groupbyExpr.orderType = pItem->sortOrder; - } else if (orderByGroupbyCol){ - pQueryInfo->order.order = pItem->sortOrder; - pQueryInfo->order.orderColId = index.columnIndex; - if (udf) { - return invalidOperationMsg(pMsgBuf, msg11); - } - } else { - pQueryInfo->order.order = pItem->sortOrder; - pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX; - if (udf) { - return invalidOperationMsg(pMsgBuf, msg11); - } + if (udf) { + return invalidOperationMsg(pMsgBuf, msg11); } + pQueryInfo->order.order = pItem->sortOrder; + pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX; + // orderby ts query on super table + if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { + addPrimaryTsColIntoResult(pQueryInfo, pCmd); + } + } + if(taosArrayGetSize(pSortOrder) == 2){ SStrToken cname = {0}; pItem = taosArrayGet(pSqlNode->pSortOrder, 1); if (pItem->isJsonExp){ @@ -6946,12 +6906,10 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq if (index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) { return invalidOperationMsg(pMsgBuf, msg6); - } else { - pQueryInfo->order.order = pItem->sortOrder; - pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX; } + pQueryInfo->order.order = pItem->sortOrder; + pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX; } - } else if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo)) { // check order by clause for normal table & temp table if (getColumnIndexByName(&columnName, pQueryInfo, &index, pMsgBuf) != TSDB_CODE_SUCCESS) { return invalidOperationMsg(pMsgBuf, msg1); @@ -6973,48 +6931,26 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq return invalidOperationMsg(pMsgBuf, msg11); } - if (udf) { - return invalidOperationMsg(pMsgBuf, msg11); - } - - CommonItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0); - //pQueryInfo->groupbyExpr.orderIndex = pSchema[index.columnIndex].colId; - pQueryInfo->groupbyExpr.orderType = p1->sortOrder; + pQueryInfo->groupbyExpr.orderType = pItem->sortOrder; } if (isTopBottomUniqueQuery(pQueryInfo)) { - SArray *columnInfo = pQueryInfo->groupbyExpr.columnInfo; - if (columnInfo != NULL && taosArrayGetSize(columnInfo) > 0) { - SColIndex* pColIndex = taosArrayGet(columnInfo, 0); - - if (pColIndex->colIndex != index.columnIndex) { - return invalidOperationMsg(pMsgBuf, msg8); - } - } else { - int32_t pos = tscExprTopBottomIndex(pQueryInfo); - assert(pos > 0); - SExprInfo* pExpr = tscExprGet(pQueryInfo, pos - 1); - assert(pExpr->base.functionId == TSDB_FUNC_TS); + int32_t pos = tscExprTopBottomIndex(pQueryInfo); + assert(pos > 0); + SExprInfo* pExpr = tscExprGet(pQueryInfo, pos - 1); + assert(pExpr->base.functionId == TSDB_FUNC_TS); - pExpr = tscExprGet(pQueryInfo, pos); + pExpr = tscExprGet(pQueryInfo, pos); - if (pExpr->base.colInfo.colIndex != index.columnIndex && index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) { - return invalidOperationMsg(pMsgBuf, msg5); - } + if (pExpr->base.colInfo.colIndex != index.columnIndex && index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) { + return invalidOperationMsg(pMsgBuf, msg5); } - - pItem = taosArrayGet(pSqlNode->pSortOrder, 0); - pQueryInfo->order.order = pItem->sortOrder; - - pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId; - return TSDB_CODE_SUCCESS; } if (udf) { return invalidOperationMsg(pMsgBuf, msg11); } - pItem = taosArrayGet(pSqlNode->pSortOrder, 0); pQueryInfo->order.order = pItem->sortOrder; pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId; } else { @@ -7051,7 +6987,6 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq } } - pItem = taosArrayGet(pSqlNode->pSortOrder, 0); pQueryInfo->order.order = pItem->sortOrder; pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId; } diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 5c9f0934a0443c123805f70eb7d8939e0d339515..dee771d5286c4a187626430c8a12e6db5502279d 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -283,48 +283,23 @@ static int compareRowData(const void *a, const void *b, const void *userData) { return (in1 != NULL && in2 != NULL) ? supporter->comFunc(in1, in2) : 0; } -static void sortGroupResByOrderList(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv *pRuntimeEnv, SSDataBlock* pDataBlock, SQLFunctionCtx *pCtx) { - int32_t size = pRuntimeEnv->pQueryAttr->pGroupbyExpr == NULL? 0: pRuntimeEnv->pQueryAttr->pGroupbyExpr->numOfGroupCols; - if (pRuntimeEnv->pQueryAttr->interval.interval > 0) size++; - - if (size <= 0) { - return; - } - - int32_t orderId = pRuntimeEnv->pQueryAttr->order.orderColId; - if (orderId <= 0) { - return; - } - - int32_t orderIndex = -1; - for (int32_t j = 0; j < pDataBlock->info.numOfCols; ++j) { - if (pCtx[j].colId == orderId) { - orderIndex = j; - break; - } - } - if (orderIndex < 0) { +static void sortGroupResByOrderList(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv *pRuntimeEnv, SSDataBlock* pDataBlock) { + if (pRuntimeEnv->pQueryAttr->pGroupbyExpr == NULL || pRuntimeEnv->pQueryAttr->pGroupbyExpr->numOfGroupCols <= 0){ return; } + SColIndex* pColIndex = taosArrayGet(pRuntimeEnv->pQueryAttr->pGroupbyExpr->columnInfo, 0); - bool found = false; int16_t dataOffset = 0; - + int16_t type = 0; for (int32_t j = 0; j < pDataBlock->info.numOfCols; ++j) { SColumnInfoData* pColInfoData = (SColumnInfoData *)taosArrayGet(pDataBlock->pDataBlock, j); - if (orderIndex == j) { - found = true; + if (pColInfoData->info.colId == pColIndex->colId) { + type = pColInfoData->info.type; break; } dataOffset += pColInfoData->info.bytes; } - - if (found == false) { - return; - } - - int16_t type = pRuntimeEnv->pQueryAttr->pExpr1[orderIndex].base.resType; SRowCompSupporter support = {.pRuntimeEnv = pRuntimeEnv, .dataOffset = dataOffset, .comFunc = getComparFunc(type, 0)}; taosArraySortPWithExt(pGroupResInfo->pRows, compareRowData, &support); } @@ -7159,7 +7134,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) { initGroupResInfo(&pRuntimeEnv->groupResInfo, &pInfo->binfo.resultRowInfo); if (!pRuntimeEnv->pQueryAttr->stableQuery) { - sortGroupResByOrderList(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes, pInfo->binfo.pCtx); + sortGroupResByOrderList(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes); } toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes);