diff --git a/src/client/src/tscGlobalmerge.c b/src/client/src/tscGlobalmerge.c index f6a9b8e257ce2c4a4dac4b0026030abf64d1ac6b..a22269f98e270ea681f5e6bc25a3865b21b8988e 100644 --- a/src/client/src/tscGlobalmerge.c +++ b/src/client/src/tscGlobalmerge.c @@ -650,48 +650,49 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD } for(int32_t i = 0; i < pBlock->info.rows; ++i) { - if (pInfo->hasPrev) { - if (needToMerge(pBlock, pInfo->orderColumnList, i, pInfo->prevRow)) { - doMergeResultImpl(pOperator, pCtx, numOfExpr, i, addrPtr); - } else { - doFinalizeResultImpl(pInfo, pCtx, numOfExpr); + if (!pInfo->hasPrev) { + doMergeResultImpl(pOperator, pCtx, numOfExpr, i, addrPtr); + savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, i, &pInfo->hasPrev); + continue; + } - int32_t numOfRows = getNumOfResult(pOperator->pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput); - setTagValueForMultipleRows(pCtx, pOperator->numOfOutput, numOfRows); + if (needToMerge(pBlock, pInfo->orderColumnList, i, pInfo->prevRow)) { + doMergeResultImpl(pOperator, pCtx, numOfExpr, i, addrPtr); + continue; + } - pInfo->binfo.pRes->info.rows += numOfRows; + doFinalizeResultImpl(pInfo, pCtx, numOfExpr); - for(int32_t j = 0; j < numOfExpr; ++j) { - pCtx[j].pOutput += (pCtx[j].outputBytes * numOfRows); - if (pCtx[j].functionId == TSDB_FUNC_TOP || pCtx[j].functionId == TSDB_FUNC_BOTTOM || - pCtx[j].functionId == TSDB_FUNC_SAMPLE || pCtx[j].functionId == TSDB_FUNC_UNIQUE) { - if(j > 0) pCtx[j].ptsOutputBuf = pCtx[j - 1].pOutput; - } - } + int32_t numOfRows = getNumOfResult(pOperator->pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput); + setTagValueForMultipleRows(pCtx, pOperator->numOfOutput, numOfRows); - for(int32_t j = 0; j < numOfExpr; ++j) { - if (pCtx[j].functionId < 0) { - continue; - } - { - assert(!TSDB_FUNC_IS_SCALAR(pCtx[j].functionId)); - aAggs[pCtx[j].functionId].init(&pCtx[j], pCtx[j].resultInfo); - } - } + pInfo->binfo.pRes->info.rows += numOfRows; - doMergeResultImpl(pOperator, pCtx, numOfExpr, i, addrPtr); + for(int32_t j = 0; j < numOfExpr; ++j) { + pCtx[j].pOutput += (pCtx[j].outputBytes * numOfRows); + if (pCtx[j].functionId == TSDB_FUNC_TOP || pCtx[j].functionId == TSDB_FUNC_BOTTOM || + pCtx[j].functionId == TSDB_FUNC_SAMPLE || pCtx[j].functionId == TSDB_FUNC_UNIQUE || + pCtx[j].functionId == TSDB_FUNC_TAIL) { + if(j > 0) pCtx[j].ptsOutputBuf = pCtx[j - 1].pOutput; + } + } + + for(int32_t j = 0; j < numOfExpr; ++j) { + if (pCtx[j].functionId < 0) { + continue; + } + { + assert(!TSDB_FUNC_IS_SCALAR(pCtx[j].functionId)); + aAggs[pCtx[j].functionId].init(&pCtx[j], pCtx[j].resultInfo); } - } else { - doMergeResultImpl(pOperator, pCtx, numOfExpr, i, addrPtr); } + doMergeResultImpl(pOperator, pCtx, numOfExpr, i, addrPtr); savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, i, &pInfo->hasPrev); } - { - for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) { - pCtx[i].pInput = addrPtr[i]; - } + for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) { + pCtx[i].pInput = addrPtr[i]; } tfree(addrPtr); @@ -899,6 +900,7 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { SMultiwayMergeInfo *pAggInfo = pOperator->info; SOperatorInfo *upstream = pOperator->upstream[0]; + SQueryAttr *pQueryAttr = pOperator->pRuntimeEnv->pQueryAttr; *newgroup = false; bool handleData = false; @@ -909,7 +911,7 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { pAggInfo->hasPrev = false; // now we start from a new group data set. // not belongs to the same group, return the result of current group; - setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pAggInfo->pExistBlock, TSDB_ORDER_ASC); + setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pAggInfo->pExistBlock, pQueryAttr->order.order); updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pAggInfo->pExistBlock->info.rows, pOperator->pRuntimeEnv, true); { // reset output buffer @@ -955,13 +957,13 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { *newgroup = true; pAggInfo->hasDataBlockForNewGroup = true; pAggInfo->pExistBlock = pBlock; - savePrevOrderColumns(pAggInfo->prevRow, pAggInfo->groupColumnList, pBlock, 0, &pAggInfo->hasPrev); + savePrevOrderColumns(pAggInfo->currentGroupColData, pAggInfo->groupColumnList, pBlock, 0, &pAggInfo->hasGroupColData); break; } } // not belongs to the same group, return the result of current group - setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC); + setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pBlock, pQueryAttr->order.order); updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor, pOperator->pRuntimeEnv, true); doExecuteFinalMerge(pOperator, pOperator->numOfOutput, pBlock); diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index c72a7a772f3f7fb49b5d0249ed927fff32fb6f9b..071b034fa298678d83a655a32ce26e39bf195eaf 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1072,7 +1072,7 @@ static bool isTopBottomUniqueQuery(SQueryInfo* pQueryInfo) { int32_t functionId = tscExprGet(pQueryInfo, i)->base.functionId; if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM - || functionId == TSDB_FUNC_UNIQUE) { + || functionId == TSDB_FUNC_UNIQUE || functionId == TSDB_FUNC_TAIL) { return true; } } @@ -2643,8 +2643,8 @@ static void updateLastScanOrderIfNeeded(SQueryInfo* pQueryInfo) { } pExpr->base.numOfParams = 1; - pExpr->base.param->i64 = TSDB_ORDER_ASC; - pExpr->base.param->nType = TSDB_DATA_TYPE_INT; + pExpr->base.param[0].i64 = TSDB_ORDER_ASC; + pExpr->base.param[0].nType = TSDB_DATA_TYPE_INT; } } } @@ -2693,7 +2693,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col const char* msg26 = "start param cannot be 0 with 'log_bin'"; const char* msg27 = "factor param cannot be negative or equal to 0/1"; const char* msg28 = "the second paramter of diff should be 0 or 1"; - const char* msg29 = "key timestamp column cannot be used to unique/mode function"; + const char* msg29 = "key timestamp column cannot be used to unique/mode/tail function"; + const char* msg30 = "offset is out of range [0, 100]"; switch (functionId) { case TSDB_FUNC_COUNT: { @@ -2853,7 +2854,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col // 2. check if sql function can be applied on this column data type SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex); - if (functionId == TSDB_FUNC_MODE && pColumnSchema->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX ){ + if (functionId == TSDB_FUNC_MODE && pColumnSchema->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX && + pColumnSchema->type == TSDB_DATA_TYPE_TIMESTAMP){ return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg29); } else if (!IS_NUMERIC_TYPE(pSchema->type) && (functionId != TSDB_FUNC_ELAPSED) && (functionId != TSDB_FUNC_MODE)) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); @@ -3105,12 +3107,13 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col case TSDB_FUNC_SAMPLE: case TSDB_FUNC_PERCT: case TSDB_FUNC_APERCT: - case TSDB_FUNC_UNIQUE: { + case TSDB_FUNC_UNIQUE: + case TSDB_FUNC_TAIL: { // 1. valid the number of parameters bool valid = true; if (pItem->pNode->Expr.paramList == NULL) { valid = false; - } else if (functionId == TSDB_FUNC_APERCT) { + } else if (functionId == TSDB_FUNC_APERCT || functionId == TSDB_FUNC_TAIL) { size_t cnt = taosArrayGetSize(pItem->pNode->Expr.paramList); if (cnt != 2 && cnt != 3) valid = false; } else if (functionId == TSDB_FUNC_UNIQUE) { @@ -3136,20 +3139,22 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6); } - if (index.columnIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX && functionId == TSDB_FUNC_UNIQUE) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg29); - } pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex); + if (index.columnIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX && pSchema->type == TSDB_DATA_TYPE_TIMESTAMP && + (functionId == TSDB_FUNC_UNIQUE || functionId == TSDB_FUNC_TAIL)) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg29); + } + // functions can not be applied to tags if (index.columnIndex >= tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6); } // 2. valid the column type - if (functionId != TSDB_FUNC_SAMPLE && functionId != TSDB_FUNC_UNIQUE && !IS_NUMERIC_TYPE(pSchema->type)) { + if (functionId != TSDB_FUNC_SAMPLE && functionId != TSDB_FUNC_UNIQUE && functionId != TSDB_FUNC_TAIL && !IS_NUMERIC_TYPE(pSchema->type)) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); } @@ -3258,13 +3263,13 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col } else { tVariantDump(pVariant, val, TSDB_DATA_TYPE_BIGINT, true); - int64_t numRowsSelected = GET_INT32_VAL(val); + int64_t numRowsSelected = GET_INT64_VAL(val); if (functionId != TSDB_FUNC_UNIQUE && (numRowsSelected <= 0 || numRowsSelected > 100)) { // todo use macro return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg12); } if(functionId == TSDB_FUNC_UNIQUE){ - GET_INT32_VAL(val) = MAX_UNIQUE_RESULT_ROWS; + GET_INT64_VAL(val) = MAX_UNIQUE_RESULT_ROWS; } // todo REFACTOR // set the first column ts for top/bottom query @@ -3281,7 +3286,25 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), resultSize, false); - tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t)); + if (functionId == TSDB_FUNC_TAIL){ + int64_t offset = 0; + if (taosArrayGetSize(pItem->pNode->Expr.paramList) == 3){ + tSqlExprItem* para = taosArrayGet(pItem->pNode->Expr.paramList, 2); + if (para->pNode->tokenId == TK_ID || para->pNode->value.nType != TSDB_DATA_TYPE_BIGINT) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); + } + offset = para->pNode->value.i64; + if (offset < 0 || offset > 100) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg30); + } + } + GET_INT64_VAL(val) = numRowsSelected + offset; + tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t)); + GET_INT64_VAL(val) = offset; + tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t)); + }else{ + tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t)); + } } memset(pExpr->base.aliasName, 0, tListLen(pExpr->base.aliasName)); @@ -4010,7 +4033,8 @@ int32_t tscTansformFuncForSTableQuery(SQueryInfo* pQueryInfo) { (functionId == TSDB_FUNC_ELAPSED) || (functionId == TSDB_FUNC_HISTOGRAM) || (functionId == TSDB_FUNC_UNIQUE) || - (functionId == TSDB_FUNC_MODE)) { + (functionId == TSDB_FUNC_MODE) || + (functionId == TSDB_FUNC_TAIL)) { if (getResultDataInfo(pSrcSchema->type, pSrcSchema->bytes, functionId, (int32_t)pExpr->base.param[0].i64, &type, &bytes, &interBytes, 0, true, NULL) != TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_OPERATION; @@ -6686,7 +6710,6 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq const char* msg6 = "only primary timestamp allowed as the second order column"; const char* msg7 = "only primary timestamp/column in groupby clause allowed as order column"; const char* msg8 = "only column in groupby clause allowed as order column"; - const char* msg9 = "orderby column must projected in subquery"; const char* msg10 = "not support distinct mixed with order by"; const char* msg11 = "not support order with udf"; const char* msg12 = "order by tags not supported with diff/derivative/csum/mavg"; @@ -6819,87 +6842,48 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq return invalidOperationMsg(pMsgBuf, msg3); } - size_t s = taosArrayGetSize(pSortOrder); - if (s == 1) { - if (orderByTags) { - if (tscIsDiffDerivLikeQuery(pQueryInfo)) { - return invalidOperationMsg(pMsgBuf, msg12); - } - //pQueryInfo->groupbyExpr.orderIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta); - - CommonItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0); - pQueryInfo->groupbyExpr.orderType = p1->sortOrder; - } else if (orderByGroupbyCol) { - CommonItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0); - - pQueryInfo->groupbyExpr.orderType = p1->sortOrder; - pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId; - if (udf) { - return invalidOperationMsg(pMsgBuf, msg11); - } - } else if (isTopBottomUniqueQuery(pQueryInfo)) { - /* order of top/bottom query in interval is not valid */ - - int32_t pos = tscExprTopBottomIndex(pQueryInfo); - assert(pos > 0); - SExprInfo* pExpr = tscExprGet(pQueryInfo, pos - 1); - assert(pExpr->base.functionId == TSDB_FUNC_TS); + if (orderByTags) { + if (tscIsDiffDerivLikeQuery(pQueryInfo)) { + return invalidOperationMsg(pMsgBuf, msg12); + } + //pQueryInfo->groupbyExpr.orderIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta); - pExpr = tscExprGet(pQueryInfo, pos); + pQueryInfo->groupbyExpr.orderType = pItem->sortOrder; + } else if (orderByGroupbyCol) { - if (pExpr->base.colInfo.colIndex != index.columnIndex && index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) { - return invalidOperationMsg(pMsgBuf, msg5); - } + pQueryInfo->groupbyExpr.orderType = pItem->sortOrder; + if (udf) { + return invalidOperationMsg(pMsgBuf, msg11); + } + } else if (isTopBottomUniqueQuery(pQueryInfo)) { + /* order of top/bottom query in interval is not valid */ - CommonItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0); - pQueryInfo->order.order = p1->sortOrder; - pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId; - return TSDB_CODE_SUCCESS; - } else { - CommonItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0); + int32_t pos = tscExprTopBottomIndex(pQueryInfo); + assert(pos > 0); + SExprInfo* pExpr = tscExprGet(pQueryInfo, pos - 1); + assert(pExpr->base.functionId == TSDB_FUNC_TS); - if (udf) { - return invalidOperationMsg(pMsgBuf, msg11); - } + pExpr = tscExprGet(pQueryInfo, pos); - pQueryInfo->order.order = p1->sortOrder; - pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX; - - // orderby ts query on super table - if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { - bool found = false; - for (int32_t i = 0; i < tscNumOfExprs(pQueryInfo); ++i) { - SExprInfo* pExpr = tscExprGet(pQueryInfo, i); - if (pExpr->base.functionId == TSDB_FUNC_PRJ && pExpr->base.colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { - found = true; - break; - } - } - if (!found && pQueryInfo->pDownstream) { - return invalidOperationMsg(pMsgBuf, msg9); - } - addPrimaryTsColIntoResult(pQueryInfo, pCmd); - } + if (pExpr->base.colInfo.colIndex != index.columnIndex && index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) { + return invalidOperationMsg(pMsgBuf, msg5); } + + pQueryInfo->order.order = pItem->sortOrder; + pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId; } else { - pItem = taosArrayGet(pSqlNode->pSortOrder, 0); - if (orderByTags) { - //pQueryInfo->groupbyExpr.orderIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta); - pQueryInfo->groupbyExpr.orderType = pItem->sortOrder; - } else if (orderByGroupbyCol){ - pQueryInfo->order.order = pItem->sortOrder; - pQueryInfo->order.orderColId = index.columnIndex; - if (udf) { - return invalidOperationMsg(pMsgBuf, msg11); - } - } else { - pQueryInfo->order.order = pItem->sortOrder; - pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX; - if (udf) { - return invalidOperationMsg(pMsgBuf, msg11); - } + if (udf) { + return invalidOperationMsg(pMsgBuf, msg11); } + pQueryInfo->order.order = pItem->sortOrder; + pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX; + // orderby ts query on super table + if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { + addPrimaryTsColIntoResult(pQueryInfo, pCmd); + } + } + if(taosArrayGetSize(pSortOrder) == 2){ SStrToken cname = {0}; pItem = taosArrayGet(pSqlNode->pSortOrder, 1); if (pItem->isJsonExp){ @@ -6918,12 +6902,10 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq if (index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) { return invalidOperationMsg(pMsgBuf, msg6); - } else { - pQueryInfo->order.order = pItem->sortOrder; - pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX; } + pQueryInfo->order.order = pItem->sortOrder; + pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX; } - } else if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo)) { // check order by clause for normal table & temp table if (getColumnIndexByName(&columnName, pQueryInfo, &index, pMsgBuf) != TSDB_CODE_SUCCESS) { return invalidOperationMsg(pMsgBuf, msg1); @@ -6945,13 +6927,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq return invalidOperationMsg(pMsgBuf, msg11); } - if (udf) { - return invalidOperationMsg(pMsgBuf, msg11); - } - - CommonItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0); - //pQueryInfo->groupbyExpr.orderIndex = pSchema[index.columnIndex].colId; - pQueryInfo->groupbyExpr.orderType = p1->sortOrder; + pQueryInfo->groupbyExpr.orderType = pItem->sortOrder; } if (isTopBottomUniqueQuery(pQueryInfo)) { @@ -6974,19 +6950,12 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq return invalidOperationMsg(pMsgBuf, msg5); } } - - pItem = taosArrayGet(pSqlNode->pSortOrder, 0); - pQueryInfo->order.order = pItem->sortOrder; - - pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId; - return TSDB_CODE_SUCCESS; } if (udf) { return invalidOperationMsg(pMsgBuf, msg11); } - pItem = taosArrayGet(pSqlNode->pSortOrder, 0); pQueryInfo->order.order = pItem->sortOrder; pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId; } else { @@ -7023,7 +6992,6 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq } } - pItem = taosArrayGet(pSqlNode->pSortOrder, 0); pQueryInfo->order.order = pItem->sortOrder; pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId; } @@ -8485,7 +8453,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, char* if (IS_MULTIOUTPUT(aAggs[f].status) && f != TSDB_FUNC_TOP && f != TSDB_FUNC_BOTTOM && f != TSDB_FUNC_DIFF && f != TSDB_FUNC_MAVG && f != TSDB_FUNC_CSUM && f != TSDB_FUNC_SAMPLE && f != TSDB_FUNC_DERIVATIVE && f != TSDB_FUNC_TAGPRJ && f != TSDB_FUNC_PRJ && - f != TSDB_FUNC_UNIQUE) { + f != TSDB_FUNC_UNIQUE && f != TSDB_FUNC_TAIL) { return invalidOperationMsg(msg, msg1); } @@ -10089,7 +10057,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf const char* msg4 = "interval query not supported, since the result of sub query not include valid timestamp column"; const char* msg5 = "only tag query not compatible with normal column filter"; const char* msg6 = "not support stddev/percentile in the outer query yet"; - const char* msg7 = "derivative/twa/rate/irate/diff requires timestamp column exists in subquery"; + const char* msg7 = "derivative/twa/rate/irate/diff/tail requires timestamp column exists in subquery"; const char* msg8 = "condition missing for join query"; const char* msg9 = "not support 3 level select"; @@ -10158,12 +10126,6 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf if (f == TSDB_FUNC_STDDEV || f == TSDB_FUNC_PERCT) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6); } - - if ((timeWindowQuery || pQueryInfo->stateWindow) && f == TSDB_FUNC_LAST) { - pExpr->base.numOfParams = 1; - pExpr->base.param[0].i64 = TSDB_ORDER_ASC; - pExpr->base.param[0].nType = TSDB_DATA_TYPE_INT; - } } STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta; @@ -10177,7 +10139,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf int32_t f = pExpr->base.functionId; if (f == TSDB_FUNC_DERIVATIVE || f == TSDB_FUNC_TWA || f == TSDB_FUNC_IRATE || - f == TSDB_FUNC_RATE || f == TSDB_FUNC_DIFF) { + f == TSDB_FUNC_RATE || f == TSDB_FUNC_DIFF || f == TSDB_FUNC_TAIL) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7); } } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index d8fc838858d2c4fdd81aad741dd26c76d743eb5c..84ae4bdb6b455718530bd3e74c4929c629cca81f 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -690,7 +690,8 @@ bool isSimpleAggregateRv(SQueryInfo* pQueryInfo) { functionId == TSDB_FUNC_TS_COMP || functionId == TSDB_FUNC_SAMPLE || functionId == TSDB_FUNC_HISTOGRAM || - functionId == TSDB_FUNC_UNIQUE)) { + functionId == TSDB_FUNC_UNIQUE || + functionId == TSDB_FUNC_TAIL)) { return true; } } @@ -2659,7 +2660,7 @@ int32_t tscExprTopBottomIndex(SQueryInfo* pQueryInfo){ if (pExpr == NULL) continue; if (pExpr->base.functionId == TSDB_FUNC_TOP || pExpr->base.functionId == TSDB_FUNC_BOTTOM - || pExpr->base.functionId == TSDB_FUNC_UNIQUE) { + || pExpr->base.functionId == TSDB_FUNC_UNIQUE || pExpr->base.functionId == TSDB_FUNC_TAIL) { return i; } } @@ -4938,7 +4939,8 @@ static int32_t createGlobalAggregateExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQu pse->colType = pExpr->base.resType; if(pExpr->base.resBytes > INT16_MAX && - (pExpr->base.functionId == TSDB_FUNC_UNIQUE || pExpr->base.functionId == TSDB_FUNC_MODE)){ + (pExpr->base.functionId == TSDB_FUNC_UNIQUE || pExpr->base.functionId == TSDB_FUNC_MODE + || pExpr->base.functionId == TSDB_FUNC_TAIL)){ pQueryAttr->interBytesForGlobal = pExpr->base.resBytes; }else{ pse->colBytes = pExpr->base.resBytes; @@ -5118,8 +5120,6 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt } } - pQueryAttr->uniqueQuery = isUniqueQuery(numOfOutput, pQueryAttr->pExpr1); - pQueryAttr->tableCols = calloc(numOfCols, sizeof(SColumnInfo)); for(int32_t i = 0; i < numOfCols; ++i) { SColumn* pCol = taosArrayGetP(pQueryInfo->colList, i); diff --git a/src/query/inc/qAggMain.h b/src/query/inc/qAggMain.h index ab506b7061a5091b642ce5d2598df3afbd81a578..ae5574888fadf262ddf8fa5018ae0a9e530c16da 100644 --- a/src/query/inc/qAggMain.h +++ b/src/query/inc/qAggMain.h @@ -80,8 +80,9 @@ extern "C" { #define TSDB_FUNC_HISTOGRAM 38 #define TSDB_FUNC_UNIQUE 39 #define TSDB_FUNC_MODE 40 +#define TSDB_FUNC_TAIL 41 -#define TSDB_FUNC_MAX_NUM 41 +#define TSDB_FUNC_MAX_NUM 42 #define TSDB_FUNCSTATE_SO 0x1u // single output #define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 23c67793fe237e8070672856d9c195b62f2f9683..a8b781718cafa7fdb381205609e97ea6f96fd409 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -223,7 +223,6 @@ typedef struct SQueryAttr { bool stableQuery; // super table query or not bool topBotQuery; // TODO used bitwise flag - bool uniqueQuery; bool groupbyColumn; // denote if this is a groupby normal column query bool hasTagResults; // if there are tag values in final result or not bool timeWindowInterpo;// if the time window start/end required interpolation @@ -734,5 +733,4 @@ void addTableReadRows(SQueryRuntimeEnv* pEnv, int32_t tid, int32_t rows); // tsdb scan table callback table or query is over. param is SQueryRuntimeEnv* bool qReadOverCB(void* param, int8_t type, int32_t tid); -bool isUniqueQuery(int32_t numOfOutput, SExprInfo* pExprs); #endif // TDENGINE_QEXECUTOR_H diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index 04efee90ecf1aea4a210f6febf2962cb4c9b087c..6dd7f17f7f9f9dd67e6a28dbc8b9d5579600119c 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -243,6 +243,16 @@ typedef struct { char res[]; } SModeFuncInfo; +typedef struct { + int64_t timestamp; + char data[]; +} TailUnit; + +typedef struct STailInfo { + int32_t num; + TailUnit **res; +} STailInfo; + int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type, int32_t *bytes, int32_t *interBytes, int16_t extLength, bool isSuperTable, SUdfInfo* pUdfInfo) { if (!isValidDataType(dataType)) { @@ -387,17 +397,23 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI return TSDB_CODE_SUCCESS; } else if (functionId == TSDB_FUNC_MODE) { - *type = TSDB_DATA_TYPE_BINARY; - int64_t size = sizeof(ModeUnit) + dataBytes; - size *= MAX_MODE_INNER_RESULT_ROWS; - size += sizeof(SModeFuncInfo); - if (size > MAX_MODE_INNER_RESULT_SIZE){ - size = MAX_MODE_INNER_RESULT_SIZE; - } - *bytes = (int32_t)size; - *interBytes = *bytes; + *type = TSDB_DATA_TYPE_BINARY; + int64_t size = sizeof(ModeUnit) + dataBytes; + size *= MAX_MODE_INNER_RESULT_ROWS; + size += sizeof(SModeFuncInfo); + if (size > MAX_MODE_INNER_RESULT_SIZE){ + size = MAX_MODE_INNER_RESULT_SIZE; + } + *bytes = (int32_t)size; + *interBytes = *bytes; - return TSDB_CODE_SUCCESS; + return TSDB_CODE_SUCCESS; + } else if (functionId == TSDB_FUNC_TAIL) { + *type = TSDB_DATA_TYPE_BINARY; + *bytes = (sizeof(STailInfo) + (sizeof(TailUnit) + dataBytes + POINTER_BYTES + extLength) * param); + *interBytes = *bytes; + + return TSDB_CODE_SUCCESS; } else if (functionId == TSDB_FUNC_SAMPLE) { *type = TSDB_DATA_TYPE_BINARY; *bytes = (sizeof(SSampleFuncInfo) + dataBytes*param + sizeof(int64_t)*param + extLength*param); @@ -521,7 +537,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI *type = (int16_t)dataType; *bytes = dataBytes; - size_t size = sizeof(STopBotInfo) + (sizeof(tValuePair) + extLength) * param; + size_t size = sizeof(STopBotInfo) + (sizeof(tValuePair) + POINTER_BYTES + extLength) * param; // the output column may be larger than sizeof(STopBotInfo) *interBytes = (int32_t)size; @@ -545,8 +561,15 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI size = MAX_MODE_INNER_RESULT_SIZE; } *interBytes = (int32_t)size; - return TSDB_CODE_SUCCESS; - }else if (functionId == TSDB_FUNC_SAMPLE) { + } else if (functionId == TSDB_FUNC_TAIL) { + *type = (int16_t)dataType; + *bytes = dataBytes; + + size_t size = (sizeof(STailInfo) + (sizeof(TailUnit) + dataBytes + POINTER_BYTES + extLength) * param); + + // the output column may be larger than sizeof(STopBotInfo) + *interBytes = (int32_t)size; + } else if (functionId == TSDB_FUNC_SAMPLE) { *type = (int16_t)dataType; *bytes = dataBytes; size_t size = sizeof(SSampleFuncInfo) + dataBytes*param + sizeof(int64_t)*param + extLength*param; @@ -945,6 +968,23 @@ static int32_t lastDistFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_ } } +static int32_t tailFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { + // not initialized yet, it is the first block, load it. + if (pCtx->pOutput == NULL) { + return BLK_DATA_ALL_NEEDED; + } + + // the pCtx should be set to current Ctx and output buffer before call this function. Otherwise, pCtx->pOutput is + // the previous windowRes output buffer, not current unloaded block. In this case, the following filter is invalid + STailInfo *pInfo = (STailInfo*) (pCtx->pOutput); + TailUnit **pList = pInfo->res; + if (pInfo->num >= pCtx->param[0].i64 && pList[0]->timestamp > w->ekey){ + return BLK_DATA_NO_NEEDED; + } else { + return BLK_DATA_ALL_NEEDED; + } +} + ////////////////////////////////////////////////////////////////////////////////////////////// /* * The intermediate result of average is kept in the interResultBuf. @@ -2506,11 +2546,11 @@ static void top_bottom_func_finalizer(SQLFunctionCtx *pCtx) { tValuePair **tvp = pRes->res; // user specify the order of output by sort the result according to timestamp - if (pCtx->param[1].i64 == PRIMARYKEY_TIMESTAMP_COL_INDEX) { - __compar_fn_t comparator = (pCtx->param[2].i64 == TSDB_ORDER_ASC) ? resAscComparFn : resDescComparFn; + if (pCtx->param[2].i64 == PRIMARYKEY_TIMESTAMP_COL_INDEX) { + __compar_fn_t comparator = (pCtx->param[3].i64 == TSDB_ORDER_ASC) ? resAscComparFn : resDescComparFn; qsort(tvp, (size_t)pResInfo->numOfRes, POINTER_BYTES, comparator); - } else /*if (pCtx->param[1].i64 > PRIMARYKEY_TIMESTAMP_COL_INDEX)*/ { - __compar_fn_t comparator = (pCtx->param[2].i64 == TSDB_ORDER_ASC) ? resDataAscComparFn : resDataDescComparFn; + } else /*if (pCtx->param[2].i64 > PRIMARYKEY_TIMESTAMP_COL_INDEX)*/ { + __compar_fn_t comparator = (pCtx->param[3].i64 == TSDB_ORDER_ASC) ? resDataAscComparFn : resDataDescComparFn; qsort(tvp, (size_t)pResInfo->numOfRes, POINTER_BYTES, comparator); } @@ -5116,21 +5156,18 @@ static void histogram_func_finalizer(SQLFunctionCtx *pCtx) { doFinalizer(pCtx); } -// unique -static void copyUniqueRes(SQLFunctionCtx *pCtx, int32_t bytes) { - SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - SUniqueFuncInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo); - - size_t size = sizeof(UniqueUnit) + bytes + pCtx->tagInfo.tagsLen; +// unique&tail copy +static void copyRes(SQLFunctionCtx *pCtx, void *data, int32_t bytes) { + size_t size = sizeof(int64_t) + bytes + pCtx->tagInfo.tagsLen; int32_t len = (int32_t)(GET_RES_INFO(pCtx)->numOfRes); char *tsOutput = pCtx->ptsOutputBuf; char *output = pCtx->pOutput; - int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->param[2].i64); - char *tvp = pRes->res + (size * ((pCtx->param[2].i64 == TSDB_ORDER_ASC) ? 0 : len -1)); + int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->param[3].i64); + char *tvp = data + (size * ((pCtx->param[3].i64 == TSDB_ORDER_ASC) ? 0 : len -1)); for (int32_t i = 0; i < len; ++i) { memcpy(tsOutput, tvp, sizeof(int64_t)); - memcpy(output, tvp + sizeof(UniqueUnit), bytes); + memcpy(output, tvp + sizeof(int64_t), bytes); tvp += (step * size); tsOutput += sizeof(int64_t); output += bytes; @@ -5147,9 +5184,9 @@ static void copyUniqueRes(SQLFunctionCtx *pCtx, int32_t bytes) { pData[i] = pCtx->tagInfo.pTagCtxList[i]->pOutput; } - tvp = pRes->res + (size * ((pCtx->param[2].i64 == TSDB_ORDER_ASC) ? 0 : len -1)); + tvp = data + (size * ((pCtx->param[3].i64 == TSDB_ORDER_ASC) ? 0 : len -1)); for (int32_t i = 0; i < len; ++i) { - int32_t offset = (int32_t)sizeof(UniqueUnit) + bytes; + int32_t offset = (int32_t)sizeof(int64_t) + bytes; for (int32_t j = 0; j < pCtx->tagInfo.numOfTagCols; ++j) { memcpy(pData[j], tvp + offset, (size_t)pCtx->tagInfo.pTagCtxList[j]->outputBytes); offset += pCtx->tagInfo.pTagCtxList[j]->outputBytes; @@ -5257,10 +5294,10 @@ static void unique_function_merge(SQLFunctionCtx *pCtx) { typedef struct{ int32_t dataOffset; __compar_fn_t comparFn; -} UiqueSupporter; +} SortSupporter; -static int32_t uniqueCompareFn(const void *p1, const void *p2, const void *param) { - UiqueSupporter *support = (UiqueSupporter *)param; +static int32_t sortCompareFn(const void *p1, const void *p2, const void *param) { + SortSupporter *support = (SortSupporter *)param; return support->comparFn((const char*)p1 + support->dataOffset, (const char*)p2 + support->dataOffset); } @@ -5278,19 +5315,19 @@ static void unique_func_finalizer(SQLFunctionCtx *pCtx) { bytes = pCtx->inputBytes; type = pCtx->inputType; } - UiqueSupporter support = {0}; + SortSupporter support = {0}; // user specify the order of output by sort the result according to timestamp - if (pCtx->param[1].i64 == PRIMARYKEY_TIMESTAMP_COL_INDEX) { + if (pCtx->param[2].i64 == PRIMARYKEY_TIMESTAMP_COL_INDEX) { support.dataOffset = 0; support.comparFn = compareInt64Val; } else{ - support.dataOffset = sizeof(UniqueUnit); + support.dataOffset = sizeof(int64_t); support.comparFn = getComparFunc(type, 0); } - size_t size = sizeof(UniqueUnit) + bytes + pCtx->tagInfo.tagsLen; - taosqsort(pInfo->res, (size_t)GET_RES_INFO(pCtx)->numOfRes, size, &support, uniqueCompareFn); - copyUniqueRes(pCtx, bytes); + size_t size = sizeof(int64_t) + bytes + pCtx->tagInfo.tagsLen; + taosqsort(pInfo->res, (size_t)GET_RES_INFO(pCtx)->numOfRes, size, &support, sortCompareFn); + copyRes(pCtx, pInfo->res, bytes); doFinalizer(pCtx); } @@ -5402,6 +5439,194 @@ static void mode_func_finalizer(SQLFunctionCtx *pCtx) { doFinalizer(pCtx); } +static void buildTailStruct(STailInfo *pTailInfo, SQLFunctionCtx *pCtx) { + char *tmp = (char *)pTailInfo + sizeof(STailInfo); + pTailInfo->res = (TailUnit**) tmp; + tmp += POINTER_BYTES * pCtx->param[0].i64; + + int32_t bytes = 0; + if (pCtx->currentStage == MERGE_STAGE) { + bytes = pCtx->outputBytes; + } else { + bytes = pCtx->inputBytes; + } + size_t size = sizeof(TailUnit) + bytes + pCtx->tagInfo.tagsLen; + + for (int32_t i = 0; i < pCtx->param[0].i64; ++i) { + pTailInfo->res[i] = (TailUnit*) tmp; + tmp += size; + } +} + +static void valueTailAssign(TailUnit *dst, int32_t bytes, const char *val, int64_t tsKey, + SExtTagsInfo *pTagInfo, char *pTags, int16_t stage) { + dst->timestamp = tsKey; + memcpy(dst->data, val, bytes); + + if (stage == MERGE_STAGE) { + memcpy(dst->data + bytes, pTags, (size_t)pTagInfo->tagsLen); + } else { // the tags are dumped from the ctx tag fields + int32_t size = 0; + for (int32_t i = 0; i < pTagInfo->numOfTagCols; ++i) { + SQLFunctionCtx* ctx = pTagInfo->pTagCtxList[i]; + if (ctx->functionId == TSDB_FUNC_TS_DUMMY) { + ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; + ctx->tag.i64 = tsKey; + } + + tVariantDump(&ctx->tag, ctx->pOutput, ctx->tag.nType, true); + memcpy(dst->data + bytes + size, ctx->pOutput, ctx->outputBytes); + size += ctx->outputBytes; + } + } +} + +static int32_t tailComparFn(const void *p1, const void *p2, const void *param) { + TailUnit *d1 = *(TailUnit **) p1; + TailUnit *d2 = *(TailUnit **) p2; + return compareInt64Val(d1, d2); +} + +static void tailSwapFn(void *dst, void *src, const void *param) +{ + TailUnit **vdst = (TailUnit **) dst; + TailUnit **vsrc = (TailUnit **) src; + + TailUnit *tmp = *vdst; + *vdst = *vsrc; + *vsrc = tmp; +} + +static void do_tail_function_add(STailInfo *pInfo, int32_t maxLen, void *pData, int64_t ts, int32_t bytes, + SExtTagsInfo *pTagInfo, char *pTags, int16_t stage) { + TailUnit **pList = pInfo->res; + + if (pInfo->num < maxLen) { + valueTailAssign(pList[pInfo->num], bytes, pData, ts, pTagInfo, pTags, stage); + + taosheapsort((void *) pList, sizeof(TailUnit **), pInfo->num + 1, NULL, tailComparFn, NULL, tailSwapFn, 0); + + pInfo->num++; + } else if(pList[0]->timestamp < ts) { + valueTailAssign(pList[0], bytes, pData, ts, pTagInfo, pTags, stage); + taosheapadjust((void *) pList, sizeof(TailUnit **), 0, maxLen - 1, NULL, tailComparFn, NULL, tailSwapFn, 0); + } +} + +static bool tail_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) { + if (!function_setup(pCtx, pResInfo)) { + return false; + } + + STailInfo *pInfo = getOutputInfo(pCtx); + buildTailStruct(pInfo, pCtx); + return true; +} + +static void tail_function(SQLFunctionCtx *pCtx) { + STailInfo *pRes = getOutputInfo(pCtx); + +// if (pCtx->stableQuery){ + for (int32_t i = 0; i < pCtx->size; ++i) { + char *data = GET_INPUT_DATA(pCtx, i); + + TSKEY ts = (pCtx->ptsList != NULL)? GET_TS_DATA(pCtx, i):0; + do_tail_function_add(pRes, (int32_t)pCtx->param[0].i64, data, ts, + pCtx->inputBytes, &pCtx->tagInfo, NULL, pCtx->currentStage); + } +// }else{ +// for (int32_t i = pCtx->size - 1; i >= 0; --i) { +// if (pRes->offset++ < (int32_t)pCtx->param[1].i64){ +// continue; +// } +// if (pRes->num >= (int32_t)(pCtx->param[0].i64 - pCtx->param[1].i64)){ // query complete +// pCtx->resultInfo->complete = true; +// for (int32_t j = 0; j < pCtx->tagInfo.numOfTagCols; ++j) { +// SQLFunctionCtx *ctx = pCtx->tagInfo.pTagCtxList[j]; +// ctx->resultInfo->complete = true; +// } +// break; +// } +// char *data = GET_INPUT_DATA(pCtx, i); +// +// TSKEY ts = (pCtx->ptsList != NULL)? GET_TS_DATA(pCtx, i):0; +// +// valueTailAssign(pRes->res[pRes->num], pCtx->inputBytes, data, ts, &pCtx->tagInfo, NULL, pCtx->currentStage); +// +// pRes->num++; +// } +// } + + // treat the result as only one result + GET_RES_INFO(pCtx)->numOfRes = 1; +} + +static void tail_func_merge(SQLFunctionCtx *pCtx) { + STailInfo *pInput = (STailInfo *)GET_INPUT_DATA_LIST(pCtx); + + // construct the input data struct from binary data + buildTailStruct(pInput, pCtx); + + STailInfo *pOutput = getOutputInfo(pCtx); + + // the intermediate result is binary, we only use the output data type + for (int32_t i = 0; i < pInput->num; ++i) { + do_tail_function_add(pOutput, (int32_t)pCtx->param[0].i64, pInput->res[i]->data, pInput->res[i]->timestamp, + pCtx->outputBytes, &pCtx->tagInfo, pInput->res[i]->data + pCtx->outputBytes, pCtx->currentStage); + } + + GET_RES_INFO(pCtx)->numOfRes = pOutput->num; +} + +static void tail_func_finalizer(SQLFunctionCtx *pCtx) { + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + + // data in temporary list is less than the required number of results, not enough qualified number of results + STailInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo); + + int32_t bytes = 0; + int32_t type = 0; + if (pCtx->currentStage == MERGE_STAGE) { + bytes = pCtx->outputBytes; + type = pCtx->outputType; + assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY); + } else { + bytes = pCtx->inputBytes; + type = pCtx->inputType; + } + +// if(pCtx->stableQuery){ + GET_RES_INFO(pCtx)->numOfRes = pRes->num - pCtx->param[1].i64; +// }else{ +// GET_RES_INFO(pCtx)->numOfRes = pRes->num; +// } + if (GET_RES_INFO(pCtx)->numOfRes <= 0) return; + + taosqsort(pRes->res, pRes->num, POINTER_BYTES, NULL, tailComparFn); + + size_t size = sizeof(int64_t) + bytes + pCtx->tagInfo.tagsLen; + void *data = calloc(size, GET_RES_INFO(pCtx)->numOfRes); + if(!data){ + qError("calloc error in tail_func_finalizer: size:%d, num:%d", (int32_t)size, GET_RES_INFO(pCtx)->numOfRes); + return; + } + for(int32_t i = 0; i < GET_RES_INFO(pCtx)->numOfRes; i++){ + memcpy(data + i * size, pRes->res[i], size); + } + + SortSupporter support = {0}; + // user specify the order of output by sort the result according to timestamp + if (pCtx->param[2].i64 != PRIMARYKEY_TIMESTAMP_COL_INDEX) { + support.dataOffset = sizeof(int64_t); + support.comparFn = getComparFunc(type, 0); + taosqsort(data, (size_t)GET_RES_INFO(pCtx)->numOfRes, size, &support, sortCompareFn); + } + + copyRes(pCtx, data, bytes); + free(data); + doFinalizer(pCtx); +} + ///////////////////////////////////////////////////////////////////////////////////////////// /* * function compatible list. @@ -5422,8 +5647,8 @@ int32_t functionCompatList[] = { 1, 1, 1, 1, -1, 1, 1, 1, 5, 1, 1, // tid_tag, deriv, csum, mavg, sample, 6, 8, -1, -1, -1, - // block_info,elapsed,histogram,unique,mode - 7, 1, -1, -1, 1 + // block_info,elapsed,histogram,unique,mode,tail + 7, 1, -1, -1, 1, -1 }; SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{ @@ -5920,5 +6145,17 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{ mode_func_finalizer, mode_function_merge, dataBlockRequired, - } + }, + { + // 41 + "tail", + TSDB_FUNC_TAIL, + TSDB_FUNC_TAIL, + TSDB_BASE_FUNC_MO | TSDB_FUNCSTATE_SELECTIVITY, + tail_function_setup, + tail_function, + tail_func_finalizer, + tail_func_merge, + tailFuncRequired, + } }; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 79a46d37bf791733bde960be74a9b67a8bc2bc2d..9279b66e646d9e73ff1a74f7f27740e1e6098c05 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -47,11 +47,6 @@ #define MULTI_KEY_DELIM "-" -#define TIME_WINDOW_COPY(_dst, _src) do {\ - (_dst).skey = (_src).skey;\ - (_dst).ekey = (_src).ekey;\ -} while (0) - enum { TS_JOIN_TS_EQUAL = 0, TS_JOIN_TS_NOT_EQUALS = 1, @@ -288,48 +283,28 @@ static int compareRowData(const void *a, const void *b, const void *userData) { return (in1 != NULL && in2 != NULL) ? supporter->comFunc(in1, in2) : 0; } -static void sortGroupResByOrderList(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv *pRuntimeEnv, SSDataBlock* pDataBlock, SQLFunctionCtx *pCtx) { - int32_t size = pRuntimeEnv->pQueryAttr->pGroupbyExpr == NULL? 0: pRuntimeEnv->pQueryAttr->pGroupbyExpr->numOfGroupCols; - if (pRuntimeEnv->pQueryAttr->interval.interval > 0) size++; - - if (size <= 0) { +static void sortGroupResByOrderList(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv *pRuntimeEnv, SSDataBlock* pDataBlock) { + if (pRuntimeEnv->pQueryAttr->pGroupbyExpr == NULL || pRuntimeEnv->pQueryAttr->pGroupbyExpr->numOfGroupCols <= 0){ return; } - int32_t orderId = pRuntimeEnv->pQueryAttr->order.orderColId; - if (orderId <= 0) { + if (pRuntimeEnv->pQueryAttr->order.orderColId <= 0){ return; } - int32_t orderIndex = -1; - for (int32_t j = 0; j < pDataBlock->info.numOfCols; ++j) { - if (pCtx[j].colId == orderId) { - orderIndex = j; - break; - } - } - if (orderIndex < 0) { - return; - } + SColIndex* pColIndex = taosArrayGet(pRuntimeEnv->pQueryAttr->pGroupbyExpr->columnInfo, 0); - bool found = false; int16_t dataOffset = 0; - + int16_t type = 0; for (int32_t j = 0; j < pDataBlock->info.numOfCols; ++j) { SColumnInfoData* pColInfoData = (SColumnInfoData *)taosArrayGet(pDataBlock->pDataBlock, j); - if (orderIndex == j) { - found = true; + if (pColInfoData->info.colId == pColIndex->colId) { + type = pColInfoData->info.type; break; } dataOffset += pColInfoData->info.bytes; } - - if (found == false) { - return; - } - - int16_t type = pRuntimeEnv->pQueryAttr->pExpr1[orderIndex].base.resType; SRowCompSupporter support = {.pRuntimeEnv = pRuntimeEnv, .dataOffset = dataOffset, .comFunc = getComparFunc(type, 0)}; taosArraySortPWithExt(pGroupResInfo->pRows, compareRowData, &support); } @@ -1962,7 +1937,8 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr pCtx->inputType = pSqlExpr->colType; if (pRuntimeEnv->pQueryAttr->interBytesForGlobal > INT16_MAX && - (pSqlExpr->functionId == TSDB_FUNC_UNIQUE || pSqlExpr->functionId == TSDB_FUNC_MODE)){ + (pSqlExpr->functionId == TSDB_FUNC_UNIQUE || pSqlExpr->functionId == TSDB_FUNC_MODE + || pSqlExpr->functionId == TSDB_FUNC_TAIL)){ pCtx->inputBytes = pRuntimeEnv->pQueryAttr->interBytesForGlobal; }else{ pCtx->inputBytes = pSqlExpr->colBytes; @@ -2001,16 +1977,15 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr int32_t functionId = pCtx->functionId; if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM - || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_UNIQUE) { + || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_UNIQUE + || functionId == TSDB_FUNC_TAIL) { int32_t f = pExpr[i-1].base.functionId; assert(f == TSDB_FUNC_TS || f == TSDB_FUNC_TS_DUMMY); - pCtx->param[2].i64 = pQueryAttr->order.order; - pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT; - pCtx->param[3].i64 = functionId; + pCtx->param[3].i64 = pQueryAttr->order.order; pCtx->param[3].nType = TSDB_DATA_TYPE_BIGINT; - pCtx->param[1].i64 = pQueryAttr->order.orderColId; + pCtx->param[2].i64 = pQueryAttr->order.orderColId; } else if (functionId == TSDB_FUNC_INTERP) { pCtx->param[2].i64 = (int8_t)pQueryAttr->fillType; if (pQueryAttr->fillVal != NULL) { @@ -3187,7 +3162,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa if ((*status) != BLK_DATA_ALL_NEEDED) { // the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet, // the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer - if (QUERY_IS_INTERVAL_QUERY(pQueryAttr) && (!pQueryAttr->pointInterpQuery) && (!pQueryAttr->uniqueQuery)) { + if (QUERY_IS_INTERVAL_QUERY(pQueryAttr) && (!pQueryAttr->pointInterpQuery)) { SResultRow* pResult = NULL; bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); @@ -3199,7 +3174,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) { longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } - } else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery) && (!pQueryAttr->diffQuery) && (!pQueryAttr->pointInterpQuery) && (!pQueryAttr->uniqueQuery)) { // stable aggregate, not interval aggregate or normal column aggregate + } else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery) && (!pQueryAttr->diffQuery) && (!pQueryAttr->pointInterpQuery)) { // stable aggregate, not interval aggregate or normal column aggregate doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx, pTableScanInfo->rowCellInfoOffset, pTableScanInfo->numOfOutput, pRuntimeEnv->current->groupIndex); @@ -3698,7 +3673,8 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i // set the timestamp output buffer for top/bottom/diff query int32_t fid = pCtx[i].functionId; if (fid == TSDB_FUNC_TOP || fid == TSDB_FUNC_BOTTOM || fid == TSDB_FUNC_DIFF || fid == TSDB_FUNC_DERIVATIVE || - fid == TSDB_FUNC_SAMPLE || fid == TSDB_FUNC_MAVG || fid == TSDB_FUNC_CSUM || fid == TSDB_FUNC_UNIQUE) { + fid == TSDB_FUNC_SAMPLE || fid == TSDB_FUNC_MAVG || fid == TSDB_FUNC_CSUM || fid == TSDB_FUNC_UNIQUE || + fid == TSDB_FUNC_TAIL) { if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i-1].pOutput; } else if (fid == TSDB_FUNC_INTERP) { assert(pCtx[0].functionId == TSDB_FUNC_TS_DUMMY || pCtx[0].functionId == TSDB_FUNC_TS); @@ -3769,7 +3745,8 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE || functionId == TSDB_FUNC_CSUM || functionId == TSDB_FUNC_MAVG || - functionId == TSDB_FUNC_SAMPLE || functionId == TSDB_FUNC_UNIQUE) { + functionId == TSDB_FUNC_SAMPLE || functionId == TSDB_FUNC_UNIQUE || + functionId == TSDB_FUNC_TAIL) { if (i > 0) pBInfo->pCtx[i].ptsOutputBuf = pBInfo->pCtx[i-1].pOutput; } else if (functionId == TSDB_FUNC_INTERP) { assert(pBInfo->pCtx[0].functionId == TSDB_FUNC_TS_DUMMY || pBInfo->pCtx[0].functionId == TSDB_FUNC_TS); @@ -3945,15 +3922,6 @@ void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResult } } -bool isUniqueQuery(int32_t numOfOutput, SExprInfo* pExprs) { - for (int32_t i = 0; i < numOfOutput; ++i) { - if (pExprs[i].base.functionId == TSDB_FUNC_UNIQUE) { - return true; - } - } - return false; -} - static bool hasMainOutput(SQueryAttr *pQueryAttr) { for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) { int32_t functionId = pQueryAttr->pExpr1[i].base.functionId; @@ -4044,7 +4012,7 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_CSUM || functionId == TSDB_FUNC_MAVG || functionId == TSDB_FUNC_SAMPLE || - functionId == TSDB_FUNC_UNIQUE) { + functionId == TSDB_FUNC_UNIQUE || functionId == TSDB_FUNC_TAIL) { if(i > 0) pCtx[i].ptsOutputBuf = pCtx[i-1].pOutput; } @@ -4114,7 +4082,8 @@ void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLF if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE || functionId == TSDB_FUNC_SAMPLE || functionId == TSDB_FUNC_MAVG || - functionId == TSDB_FUNC_CSUM || functionId == TSDB_FUNC_UNIQUE) { + functionId == TSDB_FUNC_CSUM || functionId == TSDB_FUNC_UNIQUE || + functionId == TSDB_FUNC_TAIL) { if(i > 0) pCtx[i].ptsOutputBuf = pCtx[i-1].pOutput; } @@ -5104,14 +5073,13 @@ STsdbQueryCond createTsdbQueryCond(SQueryAttr* pQueryAttr, STimeWindow* win) { .numOfCols = pQueryAttr->numOfCols, .type = BLOCK_LOAD_OFFSET_SEQ_ORDER, .loadExternalRows = false, + .twindow = *win, }; // set offset with if(pQueryAttr->skipOffset) { cond.offset = pQueryAttr->limit.offset; } - - TIME_WINDOW_COPY(cond.twindow, *win); return cond; } @@ -5279,8 +5247,6 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) { qDebug("QInfo:0x%"PRIx64" start to reverse scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64, GET_QID(pRuntimeEnv), cond.twindow.skey, cond.twindow.ekey); - pRuntimeEnv->scanFlag = REVERSE_SCAN; - pTableScanInfo->times = 1; pTableScanInfo->current = 0; pTableScanInfo->reverseTimes = 0; @@ -6876,7 +6842,6 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; int32_t order = pQueryAttr->order.order; - SOperatorInfo* upstream = pOperator->upstream[0]; STableId prevId = {0, 0}; @@ -6906,7 +6871,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { } pOperator->status = OP_RES_TO_RETURN; - pQueryAttr->order.order = order; // TODO : restore the order + pQueryAttr->order.order = order; // TODO : restore the order doCloseAllTimeWindow(pRuntimeEnv); setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); @@ -6927,7 +6892,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); int16_t bytes = pColInfoData->info.bytes; - int16_t type = pColInfoData->info.type; +// int16_t type = pColInfoData->info.type; SColumnInfoData* pTsColInfoData = taosArrayGet(pSDataBlock->pDataBlock, 0); TSKEY* tsList = (TSKEY*)pTsColInfoData->pData; @@ -6939,9 +6904,9 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI pInfo->numOfRows = 0; for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) { char* val = ((char*)pColInfoData->pData) + bytes * j; - if (isNull(val, type)) { - continue; - } +// if (isNull(val, type)) { +// continue; +// } if (pInfo->prevData == NULL) { pInfo->prevData = malloc(bytes); memcpy(pInfo->prevData, val, bytes); @@ -7162,7 +7127,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) { initGroupResInfo(&pRuntimeEnv->groupResInfo, &pInfo->binfo.resultRowInfo); if (!pRuntimeEnv->pQueryAttr->stableQuery) { - sortGroupResByOrderList(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes, pInfo->binfo.pCtx); + sortGroupResByOrderList(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes); } toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes); @@ -9006,7 +8971,7 @@ static int32_t updateOutputBufForTopBotQuery(SQueriedTableInfo* pTableInfo, SCol for (int32_t i = 0; i < numOfOutput; ++i) { int16_t functId = pExprs[i].base.functionId; - if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM || functId == TSDB_FUNC_SAMPLE || functId == TSDB_FUNC_UNIQUE) { + if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM || functId == TSDB_FUNC_SAMPLE || functId == TSDB_FUNC_UNIQUE || functId == TSDB_FUNC_TAIL) { int32_t j = getColumnIndexInSource(pTableInfo, &pExprs[i].base, pTagCols); if (j < 0 || j >= pTableInfo->numOfCols) { return TSDB_CODE_QRY_INVALID_MSG; @@ -9591,7 +9556,6 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S pQueryAttr->vgId = vgId; pQueryAttr->pFilters = pFilters; pQueryAttr->range = pQueryMsg->range; - pQueryAttr->uniqueQuery = isUniqueQuery(numOfOutput, pExprs); pQueryAttr->tableCols = calloc(numOfCols, sizeof(SSingleColumnFilterInfo)); if (pQueryAttr->tableCols == NULL) { diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index 6af5de813fe957d0c504f74f462e63e5a2984afc..8e39af3e6bc542f958bffd04413e92b4b242a7a3 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -38,12 +38,14 @@ int32_t getRowNumForMultioutput(SQueryAttr* pQueryAttr, bool topBottomQuery, boo if (pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_TOP || pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_BOTTOM || pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_SAMPLE || - pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_HISTOGRAM) { + pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_HISTOGRAM || + pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_TAIL) { return (int32_t)pQueryAttr->pExpr1[i].base.param[0].i64; } - } - if (pQueryAttr->uniqueQuery){ - return MAX_UNIQUE_RESULT_ROWS; + + if (pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_UNIQUE){ + return MAX_UNIQUE_RESULT_ROWS; + } } }