From 25aa293320985298075dbe3b7a2b3a070a059d95 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 15 Feb 2022 16:19:44 +0800 Subject: [PATCH] modify unique function like top --- src/client/src/tscGlobalmerge.c | 18 +- src/client/src/tscSQLParser.c | 948 ++++++++++++++++---------------- src/query/inc/qAggMain.h | 2 +- src/query/inc/qResultbuf.h | 3 +- src/query/src/qAggMain.c | 217 +++----- src/query/src/qExecutor.c | 41 +- src/query/src/qUtil.c | 2 +- 7 files changed, 585 insertions(+), 646 deletions(-) diff --git a/src/client/src/tscGlobalmerge.c b/src/client/src/tscGlobalmerge.c index 609136ae1f..aa7cf1dd9d 100644 --- a/src/client/src/tscGlobalmerge.c +++ b/src/client/src/tscGlobalmerge.c @@ -593,7 +593,7 @@ static void setTagValueForMultipleRows(SQLFunctionCtx* pCtx, int32_t numOfOutput } } -static void doMergeResultImpl(SMultiwayMergeInfo* pInfo, SQLFunctionCtx *pCtx, int32_t numOfExpr, int32_t rowIndex, char** pDataPtr) { +static void doMergeResultImpl(SOperatorInfo* pInfo, SQLFunctionCtx *pCtx, int32_t numOfExpr, int32_t rowIndex, char** pDataPtr) { for (int32_t j = 0; j < numOfExpr; ++j) { pCtx[j].pInput = pDataPtr[j] + pCtx[j].inputBytes * rowIndex; } @@ -605,12 +605,20 @@ static void doMergeResultImpl(SMultiwayMergeInfo* pInfo, SQLFunctionCtx *pCtx, i } if (functionId < 0) { - SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1); + SUdfInfo* pUdfInfo = taosArrayGet(((SMultiwayMergeInfo*)(pInfo->info))->udfInfo, -1 * functionId - 1); doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE); } else { assert(!TSDB_FUNC_IS_SCALAR(functionId)); aAggs[functionId].mergeFunc(&pCtx[j]); } + + SQueryAttr* pQueryAttr = pInfo->pRuntimeEnv->pQueryAttr; + if (functionId == TSDB_FUNC_UNIQUE && + (GET_RES_INFO(&(pCtx[j]))->numOfRes > MAX_UNIQUE_RESULT_ROWS || GET_RES_INFO(&(pCtx[j]))->numOfRes == -1)){ + tscError("Unique result num is too large. num: %d, limit: %d", + GET_RES_INFO(&(pCtx[j]))->numOfRes, MAX_UNIQUE_RESULT_ROWS); + longjmp(pInfo->pRuntimeEnv->env, TSDB_CODE_QRY_UNIQUE_RESULT_TOO_LARGE); + } } } @@ -644,7 +652,7 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD for(int32_t i = 0; i < pBlock->info.rows; ++i) { if (pInfo->hasPrev) { if (needToMerge(pBlock, pInfo->orderColumnList, i, pInfo->prevRow)) { - doMergeResultImpl(pInfo, pCtx, numOfExpr, i, addrPtr); + doMergeResultImpl(pOperator, pCtx, numOfExpr, i, addrPtr); } else { doFinalizeResultImpl(pInfo, pCtx, numOfExpr); @@ -671,10 +679,10 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD } } - doMergeResultImpl(pInfo, pCtx, numOfExpr, i, addrPtr); + doMergeResultImpl(pOperator, pCtx, numOfExpr, i, addrPtr); } } else { - doMergeResultImpl(pInfo, pCtx, numOfExpr, i, addrPtr); + doMergeResultImpl(pOperator, pCtx, numOfExpr, i, addrPtr); } savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, i, &pInfo->hasPrev); diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index cef3672ff1..40bb9aa4ac 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -3106,601 +3106,587 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col size_t cnt = taosArrayGetSize(pItem->pNode->Expr.paramList); if (cnt != 2 && cnt != 3) valid = false; } else if (functionId == TSDB_FUNC_UNIQUE) { - if (taosArrayGetSize(pItem->pNode->Expr.paramList) != 1) - valid = false; - else { - if (taosArrayGetSize(pItem->pNode->Expr.paramList) != 2) valid = false; - } - if (!valid) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); - } + if (taosArrayGetSize(pItem->pNode->Expr.paramList) != 1) valid = false; + }else { + if (taosArrayGetSize(pItem->pNode->Expr.paramList) != 2) valid = false; + } + if (!valid) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); + } - tSqlExprItem* pParamElem = taosArrayGet(pItem->pNode->Expr.paramList, 0); - if (pParamElem->pNode->tokenId != TK_ID) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); - } + tSqlExprItem* pParamElem = taosArrayGet(pItem->pNode->Expr.paramList, 0); + if (pParamElem->pNode->tokenId != TK_ID) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); + } - SColumnIndex index = COLUMN_INDEX_INITIALIZER; - if (getColumnIndexByName(&pParamElem->pNode->columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != - TSDB_CODE_SUCCESS) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3); - } + SColumnIndex index = COLUMN_INDEX_INITIALIZER; + if (getColumnIndexByName(&pParamElem->pNode->columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != + TSDB_CODE_SUCCESS) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3); + } - if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6); - } + if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6); + } - pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); - SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex); + pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); + SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex); - // functions can not be applied to tags - if (index.columnIndex >= tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6); - } + // functions can not be applied to tags + if (index.columnIndex >= tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6); + } - // 2. valid the column type - if (functionId != TSDB_FUNC_SAMPLE && !IS_NUMERIC_TYPE(pSchema->type)) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); + // 2. valid the column type + if (functionId != TSDB_FUNC_SAMPLE && !IS_NUMERIC_TYPE(pSchema->type)) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); + } + + tVariant* pVariant = NULL; + if (functionId != TSDB_FUNC_UNIQUE) { + // 3. valid the parameters + if (pParamElem[1].pNode->tokenId == TK_ID) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); } - tVariant* pVariant = NULL; - if (functionId != TSDB_FUNC_UNIQUE) { - // 3. valid the parameters - if (pParamElem[1].pNode->tokenId == TK_ID) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); - } + pVariant = &pParamElem[1].pNode->value; + } - pVariant = &pParamElem[1].pNode->value; - } + int16_t resultType = pSchema->type; + int32_t resultSize = pSchema->bytes; + int32_t interResult = 0; - int16_t resultType = pSchema->type; - int32_t resultSize = pSchema->bytes; - int32_t interResult = 0; + char val[8] = {0}; - char val[8] = {0}; + SExprInfo* pExpr = NULL; + if (functionId == TSDB_FUNC_PERCT || functionId == TSDB_FUNC_APERCT) { + // param1 double + if (pVariant->nType != TSDB_DATA_TYPE_DOUBLE && pVariant->nType != TSDB_DATA_TYPE_BIGINT) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5); + } + tVariantDump(pVariant, val, TSDB_DATA_TYPE_DOUBLE, true); - SExprInfo* pExpr = NULL; - if (functionId == TSDB_FUNC_PERCT || functionId == TSDB_FUNC_APERCT) { - // param1 double - if (pVariant->nType != TSDB_DATA_TYPE_DOUBLE && pVariant->nType != TSDB_DATA_TYPE_BIGINT) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5); - } - tVariantDump(pVariant, val, TSDB_DATA_TYPE_DOUBLE, true); + double dp = GET_DOUBLE_VAL(val); + if (dp < 0 || dp > TOP_BOTTOM_QUERY_LIMIT) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5); + } - double dp = GET_DOUBLE_VAL(val); - if (dp < 0 || dp > TOP_BOTTOM_QUERY_LIMIT) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5); - } + getResultDataInfo(pSchema->type, pSchema->bytes, functionId, 0, &resultType, &resultSize, &interResult, 0, + false, pUdfInfo); - getResultDataInfo(pSchema->type, pSchema->bytes, functionId, 0, &resultType, &resultSize, &interResult, 0, - false, pUdfInfo); - - /* - * sql function transformation - * for dp = 0, it is actually min, - * for dp = 100, it is max, - */ - tscInsertPrimaryTsSourceColumn(pQueryInfo, pTableMetaInfo->pTableMeta->id.uid); - colIndex += 1; // the first column is ts - - pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), - interResult, false); - tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_DOUBLE, sizeof(double)); - - // param2 int32 - if (taosArrayGetSize(pItem->pNode->Expr.paramList) == 3) { - if (pParamElem[2].pNode != NULL) { - pVariant = &pParamElem[2].pNode->value; - // check type must string - if (pVariant->nType != TSDB_DATA_TYPE_BINARY || pVariant->pz == NULL) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg13); - } - char* pzAlgo = pVariant->pz; - int32_t algo = 0; - - if (strcasecmp(pzAlgo, "t-digest") == 0) { - algo = 1; - } else if (strcasecmp(pzAlgo, "default") == 0) { - algo = 0; - } else { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg14); - } - // append algo int32_t - tscExprAddParams(&pExpr->base, (char*)&algo, TSDB_DATA_TYPE_INT, sizeof(int32_t)); + /* + * sql function transformation + * for dp = 0, it is actually min, + * for dp = 100, it is max, + */ + tscInsertPrimaryTsSourceColumn(pQueryInfo, pTableMetaInfo->pTableMeta->id.uid); + colIndex += 1; // the first column is ts + + pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), + interResult, false); + tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_DOUBLE, sizeof(double)); + + // param2 int32 + if (taosArrayGetSize(pItem->pNode->Expr.paramList) == 3) { + if (pParamElem[2].pNode != NULL) { + pVariant = &pParamElem[2].pNode->value; + // check type must string + if (pVariant->nType != TSDB_DATA_TYPE_BINARY || pVariant->pz == NULL) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg13); } + char* pzAlgo = pVariant->pz; + int32_t algo = 0; + + if (strcasecmp(pzAlgo, "t-digest") == 0) { + algo = 1; + } else if (strcasecmp(pzAlgo, "default") == 0) { + algo = 0; + } else { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg14); + } + // append algo int32_t + tscExprAddParams(&pExpr->base, (char*)&algo, TSDB_DATA_TYPE_INT, sizeof(int32_t)); } - } else if (functionId == TSDB_FUNC_MAVG || functionId == TSDB_FUNC_SAMPLE) { - if (pVariant->nType != TSDB_DATA_TYPE_BIGINT) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); - } + } + } else if (functionId == TSDB_FUNC_MAVG || functionId == TSDB_FUNC_SAMPLE) { + if (pVariant->nType != TSDB_DATA_TYPE_BIGINT) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); + } - char* endptr = NULL; - strtoll(pParamElem[1].pNode->exprToken.z, &endptr, 10); - if ((endptr - pParamElem[1].pNode->exprToken.z != pParamElem[1].pNode->exprToken.n) || errno == ERANGE) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg18); - } - tVariantDump(pVariant, val, TSDB_DATA_TYPE_BIGINT, true); + char* endptr = NULL; + strtoll(pParamElem[1].pNode->exprToken.z, &endptr, 10); + if ((endptr - pParamElem[1].pNode->exprToken.z != pParamElem[1].pNode->exprToken.n) || errno == ERANGE) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg18); + } + tVariantDump(pVariant, val, TSDB_DATA_TYPE_BIGINT, true); - int64_t numRowsSelected = GET_INT64_VAL(val); - if (numRowsSelected <= 0 || numRowsSelected > 1000) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg15); - } + int64_t numRowsSelected = GET_INT64_VAL(val); + if (numRowsSelected <= 0 || numRowsSelected > 1000) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg15); + } - // todo REFACTOR - // set the first column ts for top/bottom query - int32_t tsFuncId = (functionId == TSDB_FUNC_MAVG) ? TSDB_FUNC_TS_DUMMY : TSDB_FUNC_TS; - SColumnIndex index1 = {index.tableIndex, PRIMARYKEY_TIMESTAMP_COL_INDEX}; - pExpr = tscExprAppend(pQueryInfo, tsFuncId, &index1, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, 0, 0, false); - tstrncpy(pExpr->base.aliasName, aAggs[tsFuncId].name, sizeof(pExpr->base.aliasName)); - - const int32_t TS_COLUMN_INDEX = PRIMARYKEY_TIMESTAMP_COL_INDEX; - SColumnList ids = createColumnList(1, index.tableIndex, TS_COLUMN_INDEX); - insertResultField(pQueryInfo, colIndex, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[tsFuncId].name, - pExpr); - - colIndex += 1; // the first column is ts - - getResultDataInfo(pSchema->type, pSchema->bytes, functionId, (int32_t)numRowsSelected, &resultType, - &resultSize, &interResult, 0, false, pUdfInfo); - pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), - interResult, false); - tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t)); - } else if (functionId == TSDB_FUNC_UNIQUE) { - SColumnIndex index1 = {index.tableIndex, PRIMARYKEY_TIMESTAMP_COL_INDEX}; - pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS, &index1, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, 0, 0, false); - tstrncpy(pExpr->base.aliasName, aAggs[TSDB_FUNC_TS].name, sizeof(pExpr->base.aliasName)); - - const int32_t TS_COLUMN_INDEX = PRIMARYKEY_TIMESTAMP_COL_INDEX; - SColumnList ids = createColumnList(1, index.tableIndex, TS_COLUMN_INDEX); - insertResultField(pQueryInfo, colIndex, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, - aAggs[TSDB_FUNC_TS].name, pExpr); - - colIndex += 1; // the first column is ts - - pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), - resultSize, false); - } else { - tVariantDump(pVariant, val, TSDB_DATA_TYPE_BIGINT, true); + // todo REFACTOR + // set the first column ts for top/bottom query + int32_t tsFuncId = (functionId == TSDB_FUNC_MAVG) ? TSDB_FUNC_TS_DUMMY : TSDB_FUNC_TS; + SColumnIndex index1 = {index.tableIndex, PRIMARYKEY_TIMESTAMP_COL_INDEX}; + pExpr = tscExprAppend(pQueryInfo, tsFuncId, &index1, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, 0, 0, false); + tstrncpy(pExpr->base.aliasName, aAggs[tsFuncId].name, sizeof(pExpr->base.aliasName)); - int64_t numRowsSelected = GET_INT32_VAL(val); - if (numRowsSelected <= 0 || numRowsSelected > 100) { // todo use macro - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg12); - } + const int32_t TS_COLUMN_INDEX = PRIMARYKEY_TIMESTAMP_COL_INDEX; + SColumnList ids = createColumnList(1, index.tableIndex, TS_COLUMN_INDEX); + insertResultField(pQueryInfo, colIndex, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[tsFuncId].name, + pExpr); - // todo REFACTOR - // set the first column ts for top/bottom query - SColumnIndex index1 = {index.tableIndex, PRIMARYKEY_TIMESTAMP_COL_INDEX}; - pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS, &index1, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, 0, 0, false); - tstrncpy(pExpr->base.aliasName, aAggs[TSDB_FUNC_TS].name, sizeof(pExpr->base.aliasName)); + colIndex += 1; // the first column is ts - const int32_t TS_COLUMN_INDEX = PRIMARYKEY_TIMESTAMP_COL_INDEX; - SColumnList ids = createColumnList(1, index.tableIndex, TS_COLUMN_INDEX); - insertResultField(pQueryInfo, colIndex, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, - aAggs[TSDB_FUNC_TS].name, pExpr); + getResultDataInfo(pSchema->type, pSchema->bytes, functionId, (int32_t)numRowsSelected, &resultType, + &resultSize, &interResult, 0, false, pUdfInfo); + pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), + interResult, false); + tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t)); + } else { + tVariantDump(pVariant, val, TSDB_DATA_TYPE_BIGINT, true); - colIndex += 1; // the first column is ts + int64_t numRowsSelected = GET_INT32_VAL(val); + if (numRowsSelected <= 0 || numRowsSelected > 100) { // todo use macro + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg12); + } - pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), - resultSize, false); - tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t)); + if(functionId == TSDB_FUNC_UNIQUE){ + GET_INT32_VAL(val) = MAX_UNIQUE_RESULT_ROWS; } + // todo REFACTOR + // set the first column ts for top/bottom query + SColumnIndex index1 = {index.tableIndex, PRIMARYKEY_TIMESTAMP_COL_INDEX}; + pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS, &index1, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, 0, 0, false); + tstrncpy(pExpr->base.aliasName, aAggs[TSDB_FUNC_TS].name, sizeof(pExpr->base.aliasName)); - memset(pExpr->base.aliasName, 0, tListLen(pExpr->base.aliasName)); - getColumnName(pItem, pExpr->base.aliasName, pExpr->base.token, sizeof(pExpr->base.aliasName) - 1); + const int32_t TS_COLUMN_INDEX = PRIMARYKEY_TIMESTAMP_COL_INDEX; + SColumnList ids = createColumnList(1, index.tableIndex, TS_COLUMN_INDEX); + insertResultField(pQueryInfo, colIndex, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, + aAggs[TSDB_FUNC_TS].name, pExpr); - // todo refactor: tscColumnListInsert part - SColumnList ids = createColumnList(1, index.tableIndex, index.columnIndex); + colIndex += 1; // the first column is ts - if (finalResult) { - insertResultField(pQueryInfo, colIndex, &ids, resultSize, (int8_t)resultType, pExpr->base.aliasName, pExpr); - } else { - assert(ids.num == 1); - tscColumnListInsert(pQueryInfo->colList, ids.ids[0].columnIndex, pExpr->base.uid, pSchema); - } + pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), + resultSize, false); + tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t)); + } - return TSDB_CODE_SUCCESS; + memset(pExpr->base.aliasName, 0, tListLen(pExpr->base.aliasName)); + getColumnName(pItem, pExpr->base.aliasName, pExpr->base.token, sizeof(pExpr->base.aliasName) - 1); + + // todo refactor: tscColumnListInsert part + SColumnList ids = createColumnList(1, index.tableIndex, index.columnIndex); + + if (finalResult) { + insertResultField(pQueryInfo, colIndex, &ids, resultSize, (int8_t)resultType, pExpr->base.aliasName, pExpr); + } else { + assert(ids.num == 1); + tscColumnListInsert(pQueryInfo->colList, ids.ids[0].columnIndex, pExpr->base.uid, pSchema); } - case TSDB_FUNC_TID_TAG: { - pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7); - } + return TSDB_CODE_SUCCESS; + } - // no parameters or more than one parameter for function - if (pItem->pNode->Expr.paramList == NULL || taosArrayGetSize(pItem->pNode->Expr.paramList) != 1) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); - } + case TSDB_FUNC_TID_TAG: { + pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7); + } - tSqlExprItem* pParamItem = taosArrayGet(pItem->pNode->Expr.paramList, 0); - tSqlExpr* pParam = pParamItem->pNode; + // no parameters or more than one parameter for function + if (pItem->pNode->Expr.paramList == NULL || taosArrayGetSize(pItem->pNode->Expr.paramList) != 1) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); + } - SColumnIndex index = COLUMN_INDEX_INITIALIZER; - if (getColumnIndexByName(&pParam->columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != - TSDB_CODE_SUCCESS) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3); - } + tSqlExprItem* pParamItem = taosArrayGet(pItem->pNode->Expr.paramList, 0); + tSqlExpr* pParam = pParamItem->pNode; - pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); - SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta); + SColumnIndex index = COLUMN_INDEX_INITIALIZER; + if (getColumnIndexByName(&pParam->columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != + TSDB_CODE_SUCCESS) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3); + } - // functions can not be applied to normal columns - int32_t numOfCols = tscGetNumOfColumns(pTableMetaInfo->pTableMeta); - if (index.columnIndex < numOfCols && index.columnIndex != TSDB_TBNAME_COLUMN_INDEX) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6); - } + pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); + SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta); - if (index.columnIndex > 0) { - index.columnIndex -= numOfCols; - } + // functions can not be applied to normal columns + int32_t numOfCols = tscGetNumOfColumns(pTableMetaInfo->pTableMeta); + if (index.columnIndex < numOfCols && index.columnIndex != TSDB_TBNAME_COLUMN_INDEX) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6); + } - // 2. valid the column type - int16_t colType = 0; - if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) { - colType = TSDB_DATA_TYPE_BINARY; - } else { - colType = pSchema[index.columnIndex].type; - } + if (index.columnIndex > 0) { + index.columnIndex -= numOfCols; + } - if (colType == TSDB_DATA_TYPE_BOOL) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); - } + // 2. valid the column type + int16_t colType = 0; + if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) { + colType = TSDB_DATA_TYPE_BINARY; + } else { + colType = pSchema[index.columnIndex].type; + } - tscColumnListInsert(pTableMetaInfo->tagColList, index.columnIndex, pTableMetaInfo->pTableMeta->id.uid, - &pSchema[index.columnIndex]); - SSchema* pTagSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta); + if (colType == TSDB_DATA_TYPE_BOOL) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); + } - SSchema s = {0}; - if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) { - s = *tGetTbnameColumnSchema(); - } else { - s = pTagSchema[index.columnIndex]; - } + tscColumnListInsert(pTableMetaInfo->tagColList, index.columnIndex, pTableMetaInfo->pTableMeta->id.uid, + &pSchema[index.columnIndex]); + SSchema* pTagSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta); + + SSchema s = {0}; + if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) { + s = *tGetTbnameColumnSchema(); + } else { + s = pTagSchema[index.columnIndex]; + } - int32_t bytes = 0; - int16_t type = 0; - int32_t inter = 0; + int32_t bytes = 0; + int16_t type = 0; + int32_t inter = 0; - int32_t ret = getResultDataInfo(s.type, s.bytes, TSDB_FUNC_TID_TAG, 0, &type, &bytes, &inter, 0, 0, NULL); - assert(ret == TSDB_CODE_SUCCESS); + int32_t ret = getResultDataInfo(s.type, s.bytes, TSDB_FUNC_TID_TAG, 0, &type, &bytes, &inter, 0, 0, NULL); + assert(ret == TSDB_CODE_SUCCESS); - s.type = (uint8_t)type; - s.bytes = bytes; + s.type = (uint8_t)type; + s.bytes = bytes; - TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY); - tscAddFuncInSelectClause(pQueryInfo, 0, TSDB_FUNC_TID_TAG, &index, &s, TSDB_COL_TAG, getNewResColId(pCmd)); + TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY); + tscAddFuncInSelectClause(pQueryInfo, 0, TSDB_FUNC_TID_TAG, &index, &s, TSDB_COL_TAG, getNewResColId(pCmd)); - return TSDB_CODE_SUCCESS; + return TSDB_CODE_SUCCESS; + } + + case TSDB_FUNC_BLKINFO: { + // no parameters or more than one parameter for function + if (pItem->pNode->Expr.paramList != NULL && taosArrayGetSize(pItem->pNode->Expr.paramList) != 0) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); } - case TSDB_FUNC_BLKINFO: { - // no parameters or more than one parameter for function - if (pItem->pNode->Expr.paramList != NULL && taosArrayGetSize(pItem->pNode->Expr.paramList) != 0) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); - } + SColumnIndex index = { + .tableIndex = 0, + .columnIndex = 0, + }; + pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); - SColumnIndex index = { - .tableIndex = 0, - .columnIndex = 0, - }; - pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); + int32_t inter = 0; + int16_t resType = 0; + int32_t bytes = 0; - int32_t inter = 0; - int16_t resType = 0; - int32_t bytes = 0; + getResultDataInfo(TSDB_DATA_TYPE_INT, 4, TSDB_FUNC_BLKINFO, 0, &resType, &bytes, &inter, 0, 0, NULL); - getResultDataInfo(TSDB_DATA_TYPE_INT, 4, TSDB_FUNC_BLKINFO, 0, &resType, &bytes, &inter, 0, 0, NULL); + SSchema s = {.name = "block_dist", .type = TSDB_DATA_TYPE_BINARY, .bytes = bytes}; - SSchema s = {.name = "block_dist", .type = TSDB_DATA_TYPE_BINARY, .bytes = bytes}; + SExprInfo* pExpr = + tscExprInsert(pQueryInfo, 0, TSDB_FUNC_BLKINFO, &index, resType, bytes, getNewResColId(pCmd), bytes, 0); + tstrncpy(pExpr->base.aliasName, s.name, sizeof(pExpr->base.aliasName)); - SExprInfo* pExpr = - tscExprInsert(pQueryInfo, 0, TSDB_FUNC_BLKINFO, &index, resType, bytes, getNewResColId(pCmd), bytes, 0); - tstrncpy(pExpr->base.aliasName, s.name, sizeof(pExpr->base.aliasName)); + SColumnList ids = createColumnList(1, index.tableIndex, index.columnIndex); + insertResultField(pQueryInfo, 0, &ids, bytes, s.type, s.name, pExpr); - SColumnList ids = createColumnList(1, index.tableIndex, index.columnIndex); - insertResultField(pQueryInfo, 0, &ids, bytes, s.type, s.name, pExpr); + pExpr->base.numOfParams = 1; + pExpr->base.param[0].i64 = pTableMetaInfo->pTableMeta->tableInfo.rowSize; + pExpr->base.param[0].nType = TSDB_DATA_TYPE_BIGINT; - pExpr->base.numOfParams = 1; - pExpr->base.param[0].i64 = pTableMetaInfo->pTableMeta->tableInfo.rowSize; - pExpr->base.param[0].nType = TSDB_DATA_TYPE_BIGINT; + return TSDB_CODE_SUCCESS; + } - return TSDB_CODE_SUCCESS; + case TSDB_FUNC_HISTOGRAM: { + // check params + if (taosArrayGetSize(pItem->pNode->Expr.paramList) != 4) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg19); } - case TSDB_FUNC_HISTOGRAM: { - // check params - if (taosArrayGetSize(pItem->pNode->Expr.paramList) != 4) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg19); - } + tSqlExprItem* pParamElem = taosArrayGet(pItem->pNode->Expr.paramList, 0); + if (pParamElem->pNode->tokenId != TK_ID) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); + } - tSqlExprItem* pParamElem = taosArrayGet(pItem->pNode->Expr.paramList, 0); - if (pParamElem->pNode->tokenId != TK_ID) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); - } + SColumnIndex index = COLUMN_INDEX_INITIALIZER; + if (getColumnIndexByName(&pParamElem->pNode->columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != + TSDB_CODE_SUCCESS) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3); + } - SColumnIndex index = COLUMN_INDEX_INITIALIZER; - if (getColumnIndexByName(&pParamElem->pNode->columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != - TSDB_CODE_SUCCESS) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3); - } + if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6); + } - if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6); - } + pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); + SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex); - pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); - SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex); + if (!IS_NUMERIC_TYPE(pSchema->type)) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); + } - if (!IS_NUMERIC_TYPE(pSchema->type)) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); - } + // bin_type param + if (pParamElem[1].pNode->tokenId == TK_ID) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); + } - // bin_type param - if (pParamElem[1].pNode->tokenId == TK_ID) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); - } + tVariant* pVariant = &pParamElem[1].pNode->value; + if (pVariant == NULL || pVariant->nType != TSDB_DATA_TYPE_BINARY) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); + } - tVariant* pVariant = &pParamElem[1].pNode->value; - if (pVariant == NULL || pVariant->nType != TSDB_DATA_TYPE_BINARY) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); - } + #define USER_INPUT_BIN 0 + #define LINEAR_BIN 1 + #define LOG_BIN 2 + int8_t binType; + if (strcasecmp(pVariant->pz, "user_input") == 0) { + binType = USER_INPUT_BIN; + } else if (strcasecmp(pVariant->pz, "linear_bin") == 0) { + binType = LINEAR_BIN; + } else if (strcasecmp(pVariant->pz, "log_bin") == 0) { + binType = LOG_BIN; + } else { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg20); + } + // bin_description param in JSON format + if (pParamElem[2].pNode->tokenId == TK_ID) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); + } -#define USER_INPUT_BIN 0 -#define LINEAR_BIN 1 -#define LOG_BIN 2 - - int8_t binType; - if (strcasecmp(pVariant->pz, "user_input") == 0) { - binType = USER_INPUT_BIN; - } else if (strcasecmp(pVariant->pz, "linear_bin") == 0) { - binType = LINEAR_BIN; - } else if (strcasecmp(pVariant->pz, "log_bin") == 0) { - binType = LOG_BIN; - } else { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg20); - } - // bin_description param in JSON format - if (pParamElem[2].pNode->tokenId == TK_ID) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); - } + pVariant = &pParamElem[2].pNode->value; + if (pVariant == NULL && pVariant->nType != TSDB_DATA_TYPE_BINARY) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); + } - pVariant = &pParamElem[2].pNode->value; - if (pVariant == NULL && pVariant->nType != TSDB_DATA_TYPE_BINARY) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); + cJSON* binDesc = cJSON_Parse(pVariant->pz); + int32_t counter; + int32_t numBins; + int32_t numOutput; + double* intervals; + if (cJSON_IsObject(binDesc)) { /* linaer/log bins */ + int32_t numOfParams = cJSON_GetArraySize(binDesc); + int32_t startIndex; + if (numOfParams != 4) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg22); } - cJSON* binDesc = cJSON_Parse(pVariant->pz); - int32_t counter; - int32_t numBins; - int32_t numOutput; - double* intervals; - if (cJSON_IsObject(binDesc)) { /* linaer/log bins */ - int32_t numOfParams = cJSON_GetArraySize(binDesc); - int32_t startIndex; - if (numOfParams != 4) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg22); - } - - cJSON* start = cJSON_GetObjectItem(binDesc, "start"); - cJSON* factor = cJSON_GetObjectItem(binDesc, "factor"); - cJSON* width = cJSON_GetObjectItem(binDesc, "width"); - cJSON* count = cJSON_GetObjectItem(binDesc, "count"); - cJSON* infinity = cJSON_GetObjectItem(binDesc, "infinity"); + cJSON* start = cJSON_GetObjectItem(binDesc, "start"); + cJSON* factor = cJSON_GetObjectItem(binDesc, "factor"); + cJSON* width = cJSON_GetObjectItem(binDesc, "width"); + cJSON* count = cJSON_GetObjectItem(binDesc, "count"); + cJSON* infinity = cJSON_GetObjectItem(binDesc, "infinity"); - if (!cJSON_IsNumber(start) || !cJSON_IsNumber(count) || !cJSON_IsBool(infinity)) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg22); - } + if (!cJSON_IsNumber(start) || !cJSON_IsNumber(count) || !cJSON_IsBool(infinity)) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg22); + } - if (count->valueint <= 0 || count->valueint > 1000) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg25); - } + if (count->valueint <= 0 || count->valueint > 1000) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg25); + } - if (isinf(start->valuedouble) || (width != NULL && isinf(width->valuedouble)) || - (factor != NULL && isinf(factor->valuedouble)) || (count != NULL && isinf(count->valuedouble))) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg23); - } + if (isinf(start->valuedouble) || (width != NULL && isinf(width->valuedouble)) || + (factor != NULL && isinf(factor->valuedouble)) || (count != NULL && isinf(count->valuedouble))) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg23); + } - counter = (int32_t)count->valueint; - if (infinity->valueint == false) { - startIndex = 0; - numBins = counter + 1; - } else { - startIndex = 1; - numBins = counter + 3; - } + counter = (int32_t)count->valueint; + if (infinity->valueint == false) { + startIndex = 0; + numBins = counter + 1; + } else { + startIndex = 1; + numBins = counter + 3; + } - intervals = tcalloc(numBins, sizeof(double)); - if (cJSON_IsNumber(width) && factor == NULL && binType == LINEAR_BIN) { - // linear bin process - if (width->valuedouble == 0) { - tfree(intervals); - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg24); - } - for (int i = 0; i < counter + 1; ++i) { - intervals[startIndex] = start->valuedouble + i * width->valuedouble; - if (isinf(intervals[startIndex])) { - tfree(intervals); - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg23); - } - startIndex++; - } - } else if (cJSON_IsNumber(factor) && width == NULL && binType == LOG_BIN) { - // log bin process - if (start->valuedouble == 0) { - tfree(intervals); - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg26); - } - if (factor->valuedouble < 0 || factor->valuedouble == 0 || factor->valuedouble == 1) { - tfree(intervals); - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg27); - } - for (int i = 0; i < counter + 1; ++i) { - intervals[startIndex] = start->valuedouble * pow(factor->valuedouble, i * 1.0); - if (isinf(intervals[startIndex])) { - tfree(intervals); - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg23); - } - startIndex++; - } - } else { + intervals = tcalloc(numBins, sizeof(double)); + if (cJSON_IsNumber(width) && factor == NULL && binType == LINEAR_BIN) { + // linear bin process + if (width->valuedouble == 0) { tfree(intervals); - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg22); + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg24); } - - if (infinity->valueint == true) { - intervals[0] = -DBL_MAX; - intervals[numBins - 1] = DBL_MAX; - if (isinf(intervals[0]) || isinf(intervals[numBins - 1])) { + for (int i = 0; i < counter + 1; ++i) { + intervals[startIndex] = start->valuedouble + i * width->valuedouble; + if (isinf(intervals[startIndex])) { tfree(intervals); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg23); } - // in case of desc bin orders, -inf/inf should be swapped - assert(numBins >= 4); - if (intervals[1] > intervals[numBins - 2]) { - SWAP(intervals[0], intervals[numBins - 1], double); - } + startIndex++; } - - } else if (cJSON_IsArray(binDesc)) { /* user input bins */ - if (binType != USER_INPUT_BIN) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg22); + } else if (cJSON_IsNumber(factor) && width == NULL && binType == LOG_BIN) { + // log bin process + if (start->valuedouble == 0) { + tfree(intervals); + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg26); } - counter = numBins = cJSON_GetArraySize(binDesc); - intervals = tcalloc(numBins, sizeof(double)); - cJSON* bin = binDesc->child; - if (bin == NULL) { + if (factor->valuedouble < 0 || factor->valuedouble == 0 || factor->valuedouble == 1) { tfree(intervals); - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg22); + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg27); } - int i = 0; - while (bin) { - intervals[i] = bin->valuedouble; - if (!cJSON_IsNumber(bin)) { - tfree(intervals); - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg22); - } - if (i != 0 && intervals[i] <= intervals[i - 1]) { + for (int i = 0; i < counter + 1; ++i) { + intervals[startIndex] = start->valuedouble * pow(factor->valuedouble, i * 1.0); + if (isinf(intervals[startIndex])) { tfree(intervals); - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg22); + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg23); } - bin = bin->next; - i++; + startIndex++; } } else { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg21); + tfree(intervals); + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg22); } - int16_t resultType = pSchema->type; - int32_t resultSize = pSchema->bytes; - int32_t interResult = 0; - getResultDataInfo(pSchema->type, pSchema->bytes, functionId, counter, &resultType, &resultSize, &interResult, 0, - false, pUdfInfo); - SExprInfo* pExpr = NULL; - pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), interResult, - false); - numOutput = numBins - 1; - tscExprAddParams(&pExpr->base, (char*)&numOutput, TSDB_DATA_TYPE_INT, sizeof(int32_t)); - tscExprAddParams(&pExpr->base, (char*)intervals, TSDB_DATA_TYPE_BINARY, sizeof(double) * numBins); - tfree(intervals); - - // normalized param - char val[8] = {0}; - if (pParamElem[3].pNode->tokenId == TK_ID) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); + if (infinity->valueint == true) { + intervals[0] = -DBL_MAX; + intervals[numBins - 1] = DBL_MAX; + if (isinf(intervals[0]) || isinf(intervals[numBins - 1])) { + tfree(intervals); + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg23); + } + // in case of desc bin orders, -inf/inf should be swapped + assert(numBins >= 4); + if (intervals[1] > intervals[numBins - 2]) { + SWAP(intervals[0], intervals[numBins - 1], double); + } } - pVariant = &pParamElem[3].pNode->value; - if (pVariant == NULL || pVariant->nType != TSDB_DATA_TYPE_BIGINT || - (pVariant->i64 != 0 && pVariant->i64 != 1)) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); + } else if (cJSON_IsArray(binDesc)) { /* user input bins */ + if (binType != USER_INPUT_BIN) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg22); } - - if (tVariantDump(pVariant, val, TSDB_DATA_TYPE_BIGINT, true) < 0) { - return TSDB_CODE_TSC_INVALID_OPERATION; + counter = numBins = cJSON_GetArraySize(binDesc); + intervals = tcalloc(numBins, sizeof(double)); + cJSON* bin = binDesc->child; + if (bin == NULL) { + tfree(intervals); + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg22); } + int i = 0; + while (bin) { + intervals[i] = bin->valuedouble; + if (!cJSON_IsNumber(bin)) { + tfree(intervals); + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg22); + } + if (i != 0 && intervals[i] <= intervals[i - 1]) { + tfree(intervals); + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg22); + } + bin = bin->next; + i++; + } + } else { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg21); + } + + int16_t resultType = pSchema->type; + int32_t resultSize = pSchema->bytes; + int32_t interResult = 0; + getResultDataInfo(pSchema->type, pSchema->bytes, functionId, counter, &resultType, &resultSize, &interResult, 0, + false, pUdfInfo); + SExprInfo* pExpr = NULL; + pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), interResult, + false); + numOutput = numBins - 1; + tscExprAddParams(&pExpr->base, (char*)&numOutput, TSDB_DATA_TYPE_INT, sizeof(int32_t)); + tscExprAddParams(&pExpr->base, (char*)intervals, TSDB_DATA_TYPE_BINARY, sizeof(double) * numBins); + tfree(intervals); + + // normalized param + char val[8] = {0}; + if (pParamElem[3].pNode->tokenId == TK_ID) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); + } - tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_BIGINT, LONG_BYTES); + pVariant = &pParamElem[3].pNode->value; + if (pVariant == NULL || pVariant->nType != TSDB_DATA_TYPE_BIGINT || + (pVariant->i64 != 0 && pVariant->i64 != 1)) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); + } - memset(pExpr->base.aliasName, 0, tListLen(pExpr->base.aliasName)); - getColumnName(pItem, pExpr->base.aliasName, pExpr->base.token, sizeof(pExpr->base.aliasName) - 1); - // todo refactor: tscColumnListInsert part - SColumnList ids = createColumnList(1, index.tableIndex, index.columnIndex); + if (tVariantDump(pVariant, val, TSDB_DATA_TYPE_BIGINT, true) < 0) { + return TSDB_CODE_TSC_INVALID_OPERATION; + } - if (finalResult) { - insertResultField(pQueryInfo, colIndex, &ids, resultSize, (int8_t)resultType, pExpr->base.aliasName, pExpr); - } else { - assert(ids.num == 1); - tscColumnListInsert(pQueryInfo->colList, ids.ids[0].columnIndex, pExpr->base.uid, pSchema); - } + tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_BIGINT, LONG_BYTES); - tscInsertPrimaryTsSourceColumn(pQueryInfo, pExpr->base.uid); - return TSDB_CODE_SUCCESS; + memset(pExpr->base.aliasName, 0, tListLen(pExpr->base.aliasName)); + getColumnName(pItem, pExpr->base.aliasName, pExpr->base.token, sizeof(pExpr->base.aliasName) - 1); + // todo refactor: tscColumnListInsert part + SColumnList ids = createColumnList(1, index.tableIndex, index.columnIndex); + + if (finalResult) { + insertResultField(pQueryInfo, colIndex, &ids, resultSize, (int8_t)resultType, pExpr->base.aliasName, pExpr); + } else { + assert(ids.num == 1); + tscColumnListInsert(pQueryInfo->colList, ids.ids[0].columnIndex, pExpr->base.uid, pSchema); } - default: { - assert(!TSDB_FUNC_IS_SCALAR(functionId)); - pUdfInfo = isValidUdf(pQueryInfo->pUdfInfo, pItem->pNode->Expr.operand.z, pItem->pNode->Expr.operand.n); - if (pUdfInfo == NULL) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg9); - } + tscInsertPrimaryTsSourceColumn(pQueryInfo, pExpr->base.uid); + return TSDB_CODE_SUCCESS; + } - if (pItem->pNode->Expr.paramList == NULL || taosArrayGetSize(pItem->pNode->Expr.paramList) <= 0) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg13); - } + default: { + assert(!TSDB_FUNC_IS_SCALAR(functionId)); + pUdfInfo = isValidUdf(pQueryInfo->pUdfInfo, pItem->pNode->Expr.operand.z, pItem->pNode->Expr.operand.n); + if (pUdfInfo == NULL) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg9); + } - tSqlExprItem* pParamElem = taosArrayGet(pItem->pNode->Expr.paramList, 0); - ; - if (pParamElem->pNode->tokenId != TK_ID) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); - } + if (pItem->pNode->Expr.paramList == NULL || taosArrayGetSize(pItem->pNode->Expr.paramList) <= 0) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg13); + } - SColumnIndex index = COLUMN_INDEX_INITIALIZER; - if (getColumnIndexByName(&pParamElem->pNode->columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != - TSDB_CODE_SUCCESS) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3); - } + tSqlExprItem* pParamElem = taosArrayGet(pItem->pNode->Expr.paramList, 0); + ; + if (pParamElem->pNode->tokenId != TK_ID) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); + } - if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6); - } + SColumnIndex index = COLUMN_INDEX_INITIALIZER; + if (getColumnIndexByName(&pParamElem->pNode->columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != + TSDB_CODE_SUCCESS) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3); + } - pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); + if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6); + } - // functions can not be applied to tags - if (index.columnIndex >= tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6); - } + pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); - int32_t inter = 0; - int16_t resType = 0; - int32_t bytes = 0; - getResultDataInfo(TSDB_DATA_TYPE_INT, 4, functionId, 0, &resType, &bytes, &inter, 0, false, pUdfInfo); + // functions can not be applied to tags + if (index.columnIndex >= tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6); + } - SExprInfo* pExpr = - tscExprAppend(pQueryInfo, functionId, &index, resType, bytes, getNewResColId(pCmd), inter, false); + int32_t inter = 0; + int16_t resType = 0; + int32_t bytes = 0; + getResultDataInfo(TSDB_DATA_TYPE_INT, 4, functionId, 0, &resType, &bytes, &inter, 0, false, pUdfInfo); - memset(pExpr->base.aliasName, 0, tListLen(pExpr->base.aliasName)); - getColumnName(pItem, pExpr->base.aliasName, pExpr->base.token, sizeof(pExpr->base.aliasName) - 1); + SExprInfo* pExpr = + tscExprAppend(pQueryInfo, functionId, &index, resType, bytes, getNewResColId(pCmd), inter, false); - SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex); + memset(pExpr->base.aliasName, 0, tListLen(pExpr->base.aliasName)); + getColumnName(pItem, pExpr->base.aliasName, pExpr->base.token, sizeof(pExpr->base.aliasName) - 1); - uint64_t uid = pTableMetaInfo->pTableMeta->id.uid; - SColumnList ids = createColumnList(1, index.tableIndex, index.columnIndex); - if (finalResult) { - insertResultField(pQueryInfo, colIndex, &ids, pUdfInfo->resBytes, pUdfInfo->resType, pExpr->base.aliasName, - pExpr); - } else { - for (int32_t i = 0; i < ids.num; ++i) { - tscColumnListInsert(pQueryInfo->colList, index.columnIndex, uid, pSchema); - } + SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex); + + uint64_t uid = pTableMetaInfo->pTableMeta->id.uid; + SColumnList ids = createColumnList(1, index.tableIndex, index.columnIndex); + if (finalResult) { + insertResultField(pQueryInfo, colIndex, &ids, pUdfInfo->resBytes, pUdfInfo->resType, pExpr->base.aliasName, + pExpr); + } else { + for (int32_t i = 0; i < ids.num; ++i) { + tscColumnListInsert(pQueryInfo->colList, index.columnIndex, uid, pSchema); } - tscInsertPrimaryTsSourceColumn(pQueryInfo, pTableMetaInfo->pTableMeta->id.uid); - return TSDB_CODE_SUCCESS; } + tscInsertPrimaryTsSourceColumn(pQueryInfo, pTableMetaInfo->pTableMeta->id.uid); + return TSDB_CODE_SUCCESS; } - - return TSDB_CODE_TSC_INVALID_OPERATION; } + + return TSDB_CODE_TSC_INVALID_OPERATION; } // todo refactor static SColumnList createColumnList(int32_t num, int16_t tableIndex, int32_t columnIndex) { diff --git a/src/query/inc/qAggMain.h b/src/query/inc/qAggMain.h index 2c09dfb7eb..23e52bfaf9 100644 --- a/src/query/inc/qAggMain.h +++ b/src/query/inc/qAggMain.h @@ -202,7 +202,7 @@ typedef struct SQLFunctionCtx { SPoint1 start; SPoint1 end; - int32_t maxUniqueResult; + SHashObj *pUniqueSet; // for unique function } SQLFunctionCtx; typedef struct SAggFunctionInfo { diff --git a/src/query/inc/qResultbuf.h b/src/query/inc/qResultbuf.h index 301c5e77d0..83662236b5 100644 --- a/src/query/inc/qResultbuf.h +++ b/src/query/inc/qResultbuf.h @@ -78,7 +78,8 @@ typedef struct SDiskbasedResultBuf { #define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L) // in bytes #define PAGE_INFO_INITIALIZER (SPageDiskInfo){-1, -1} -#define MAX_UNIQUE_RESULT_SIZE (1000000) +#define MAX_UNIQUE_RESULT_ROWS (10000) +#define MAX_UNIQUE_RESULT_SIZE (1024*1024*20) /** * create disk-based result buffer * @param pResultBuf diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index afc0441360..6f16255f14 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -225,19 +225,14 @@ typedef struct{ typedef struct { int64_t timestamp; - char * pTags; + char data[]; } UniqueUnit; typedef struct { - SHashObj *pSet; int32_t num; char res[]; } SUniqueFuncInfo; -void freeUniqueUnit(void* unit){ - tfree(((UniqueUnit *)unit)->pTags); -} - int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type, int32_t *bytes, int32_t *interBytes, int16_t extLength, bool isSuperTable, SUdfInfo* pUdfInfo) { if (!isValidDataType(dataType)) { @@ -371,7 +366,13 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI return TSDB_CODE_SUCCESS; } else if (functionId == TSDB_FUNC_UNIQUE) { *type = TSDB_DATA_TYPE_BINARY; - *bytes = (sizeof(SUniqueFuncInfo) + (sizeof(tValuePair) + POINTER_BYTES + extLength) * param); + int64_t size = sizeof(UniqueUnit) + dataBytes + extLength; + size *= param; + size += sizeof(SUniqueFuncInfo); + if (size > MAX_UNIQUE_RESULT_SIZE){ + size = MAX_UNIQUE_RESULT_SIZE; + } + *bytes = size; *interBytes = *bytes; return TSDB_CODE_SUCCESS; @@ -498,17 +499,19 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI *type = (int16_t)dataType; *bytes = dataBytes; - size_t size = sizeof(STopBotInfo) + (sizeof(tValuePair) + POINTER_BYTES + extLength) * param; + size_t size = sizeof(STopBotInfo) + (sizeof(tValuePair) + extLength) * param; // the output column may be larger than sizeof(STopBotInfo) *interBytes = (int32_t)size; } else if (functionId == TSDB_FUNC_UNIQUE) { *type = (int16_t)dataType; *bytes = dataBytes; - - size_t size = sizeof(SUniqueFuncInfo) + (sizeof(tValuePair) + POINTER_BYTES + extLength) * param; - - // the output column may be larger than sizeof(STopBotInfo) + int64_t size = sizeof(UniqueUnit) + dataBytes + extLength; + size *= param; + size += sizeof(SUniqueFuncInfo); + if (size > MAX_UNIQUE_RESULT_SIZE){ + size = MAX_UNIQUE_RESULT_SIZE; + } *interBytes = (int32_t)size; } else if (functionId == TSDB_FUNC_SAMPLE) { *type = (int16_t)dataType; @@ -5143,87 +5146,19 @@ static void copyUniqueRes(SQLFunctionCtx *pCtx, int32_t type) { SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); SUniqueFuncInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo); - size_t size = sizeof(tValuePair) + pCtx->tagInfo.tagsLen; + size_t size = sizeof(UniqueUnit) + pCtx->inputBytes + pCtx->tagInfo.tagsLen; char *tvp = pRes->res; int32_t len = (int32_t)(GET_RES_INFO(pCtx)->numOfRes); - switch (type) { - case TSDB_DATA_TYPE_UINT: - case TSDB_DATA_TYPE_INT: { - int32_t *output = (int32_t *)pCtx->pOutput; - for (int32_t i = 0; i < len; ++i, output ++) { - *output = ((tValuePair *)tvp)->v.i64; - tvp += size; - } - break; - } - case TSDB_DATA_TYPE_UBIGINT: - case TSDB_DATA_TYPE_BIGINT: - case TSDB_DATA_TYPE_TIMESTAMP:{ - int64_t *output = (int64_t *)pCtx->pOutput; - for (int32_t i = 0; i < len; ++i, output ++) { - *output = ((tValuePair *)tvp)->v.i64; - tvp += size; - } - break; - } - case TSDB_DATA_TYPE_DOUBLE: { - double *output = (double *)pCtx->pOutput; - for (int32_t i = 0; i < len; ++i, output ++) { - *output = ((tValuePair *)tvp)->v.dKey; - tvp += size; - } - break; - } - case TSDB_DATA_TYPE_FLOAT: { - float *output = (float *)pCtx->pOutput; - for (int32_t i = 0; i < len; ++i, output ++) { - *output = ((tValuePair *)tvp)->v.dKey; - tvp += size; - } - break; - } - case TSDB_DATA_TYPE_USMALLINT: - case TSDB_DATA_TYPE_SMALLINT: { - int16_t *output = (int16_t *)pCtx->pOutput; - for (int32_t i = 0; i < len; ++i, output ++) { - *output = ((tValuePair *)tvp)->v.i64; - tvp += size; - } - break; - } - case TSDB_DATA_TYPE_UTINYINT: - case TSDB_DATA_TYPE_TINYINT: - case TSDB_DATA_TYPE_BOOL:{ - int8_t *output = (int8_t *)pCtx->pOutput; - for (int32_t i = 0; i < len; ++i, output ++) { - *output = ((tValuePair *)tvp)->v.i64; - tvp += size; - } - break; - } - case TSDB_DATA_TYPE_BINARY: - case TSDB_DATA_TYPE_NCHAR: { - char *output = pCtx->pOutput; - for (int32_t i = 0; i < len; ++i, output += pCtx->outputBytes) { - *output = ((tValuePair *)tvp)->v.i64; - memcpy(output, ((tValuePair *)tvp)->v.pz, ((tValuePair *)tvp)->v.nLen); - tvp += size; - } - break; - } - default: { - qError("unique function not support data type:%d", pCtx->inputType); - return; - } - } - - // set the output timestamp of each record. - TSKEY *output = pCtx->ptsOutputBuf; - for (int32_t i = 0; i < len; ++i, output ++) { - *output = ((tValuePair *)tvp)->timestamp; + char *tsOutput = pCtx->ptsOutputBuf; + char *output = pCtx->pOutput; + for (int32_t i = 0; i < len; ++i) { + memcpy(tsOutput, tvp, sizeof(int64_t)); + memcpy(output, tvp + sizeof(UniqueUnit), pCtx->inputBytes); tvp += size; + tsOutput += sizeof(int64_t); + output += pCtx->inputBytes; } // set the corresponding tag data for each record @@ -5237,14 +5172,15 @@ static void copyUniqueRes(SQLFunctionCtx *pCtx, int32_t type) { pData[i] = pCtx->tagInfo.pTagCtxList[i]->pOutput; } - for (int32_t i = 0; i < len; ++i, output ++) { - int16_t offset = 0; + tvp = pRes->res; + for (int32_t i = 0; i < len; ++i) { + int16_t offset = sizeof(UniqueUnit) + pCtx->inputBytes; for (int32_t j = 0; j < pCtx->tagInfo.numOfTagCols; ++j) { - memcpy(pData[j], ((tValuePair *)tvp)->pTags + offset, (size_t)pCtx->tagInfo.pTagCtxList[j]->outputBytes); + memcpy(pData[j], tvp + offset, (size_t)pCtx->tagInfo.pTagCtxList[j]->outputBytes); offset += pCtx->tagInfo.pTagCtxList[j]->outputBytes; pData[j] += pCtx->tagInfo.pTagCtxList[j]->outputBytes; - tvp += size; } + tvp += size; } tfree(pData); @@ -5254,69 +5190,79 @@ static bool unique_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pRes if (!function_setup(pCtx, pResInfo)) { return false; } - SUniqueFuncInfo *uniqueInfo = getUniqueOutputInfo(pCtx); - uniqueInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); - taosHashSetFreeFp(uniqueInfo->pSet, freeUniqueUnit); - return true; } -static void unique_function(SQLFunctionCtx *pCtx) { - SUniqueFuncInfo *pInfo = getUniqueOutputInfo(pCtx); - - for (int32_t i = 0; i < pCtx->size; i++) { - char *pData = GET_INPUT_DATA(pCtx, i); - TSKEY k = 0; - if (pCtx->ptsList != NULL) { - k = GET_TS_DATA(pCtx, i); - } - tValuePair *unique = taosHashGet(pInfo->pSet, pData, pCtx->inputBytes); - if (unique == NULL) { - size_t size = sizeof(tValuePair) + pCtx->tagInfo.tagsLen; - tValuePair *tmp = (tValuePair *)(pInfo->res + pInfo->num * size); - if (pCtx->inputType == TSDB_DATA_TYPE_BINARY || pCtx->inputType == TSDB_DATA_TYPE_NCHAR) { - tVariantCreateFromBinary(&tmp->v, varDataVal(pData), varDataLen(pData), pCtx->inputType); - }else{ - tVariantCreateFromBinary(&tmp->v, pData, 0, pCtx->inputType); - } - tmp->timestamp = k; +static void do_unique_function(SQLFunctionCtx *pCtx, SUniqueFuncInfo *pInfo, TSKEY timestamp, char *pData, char *tag){ + tValuePair *unique = taosHashGet(pCtx->pUniqueSet, pData, pCtx->inputBytes); + if (unique == NULL) { + size_t size = sizeof(UniqueUnit) + pCtx->inputBytes + pCtx->tagInfo.tagsLen; + char *tmp = pInfo->res + pInfo->num * size; + ((UniqueUnit*)tmp)->timestamp = timestamp; + char *data = tmp + sizeof(UniqueUnit); + char *tags = tmp + sizeof(UniqueUnit) + pCtx->inputBytes; + memcpy(data, pData, pCtx->inputBytes); + if (pCtx->currentStage == MERGE_STAGE && tag != NULL) { + memcpy(tags, tag, (size_t)pCtx->tagInfo.tagsLen); + }else{ int32_t offset = 0; for (int32_t j = 0; j < pCtx->tagInfo.numOfTagCols; ++j) { SQLFunctionCtx *tagCtx = pCtx->tagInfo.pTagCtxList[j]; if (tagCtx->functionId == TSDB_FUNC_TAG_DUMMY) { aAggs[TSDB_FUNC_TAG].xFunction(tagCtx); - memcpy(tmp->pTags + offset, tagCtx->pOutput, tagCtx->outputBytes); + memcpy(tags + offset, tagCtx->pOutput, tagCtx->outputBytes); offset += tagCtx->outputBytes; } } + } + + taosHashPut(pCtx->pUniqueSet, pData, pCtx->inputBytes, &tmp, sizeof(tValuePair*)); + pInfo->num++; + }else if(unique->timestamp > timestamp){ + unique->timestamp = timestamp; + } +} + +static void unique_function(SQLFunctionCtx *pCtx) { + SUniqueFuncInfo *pInfo = getUniqueOutputInfo(pCtx); - taosHashPut(pInfo->pSet, pData, pCtx->inputBytes, &tmp, sizeof(tValuePair*)); - pInfo->num++; - }else if(unique->timestamp > k){ - unique->timestamp = k; + for (int32_t i = 0; i < pCtx->size; i++) { + char *pData = GET_INPUT_DATA(pCtx, i); + TSKEY k = 0; + if (pCtx->ptsList != NULL) { + k = GET_TS_DATA(pCtx, i); + } + do_unique_function(pCtx, pInfo, k, pData, NULL); + + if (sizeof(SUniqueFuncInfo) + pInfo->num * (sizeof(UniqueUnit) + pCtx->inputBytes + pCtx->tagInfo.tagsLen) >= MAX_UNIQUE_RESULT_SIZE){ + GET_RES_INFO(pCtx)->numOfRes = -1; // mark out of memory + return; } } + GET_RES_INFO(pCtx)->numOfRes = 1; } static void unique_function_merge(SQLFunctionCtx *pCtx) { - //SUniqueFuncInfo *pInput = (SUniqueFuncInfo *)GET_INPUT_DATA_LIST(pCtx); - //SUniqueFuncInfo *pOutput = getUniqueOutputInfo(pCtx); - // the intermediate result is binary, we only use the output data type -// for (int32_t i = 0; i < pInput->num; ++i) { -// int16_t type = (pCtx->outputType == TSDB_DATA_TYPE_FLOAT)? TSDB_DATA_TYPE_DOUBLE:pCtx->outputType; -// do_top_function_add(pOutput, (int32_t)pCtx->param[0].i64, &pInput->res[i]->v.i64, pInput->res[i]->timestamp, -// type, &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage); -// } -// -// SET_VAL(pCtx, pInput->num, pOutput->num); -// -// if (pOutput->num > 0) { -// SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); -// pResInfo->hasResult = DATA_SET_FLAG; -// } + SUniqueFuncInfo *pInput = (SUniqueFuncInfo *)GET_INPUT_DATA_LIST(pCtx); + SUniqueFuncInfo *pOutput = getUniqueOutputInfo(pCtx); + size_t size = sizeof(UniqueUnit) + pCtx->inputBytes + pCtx->tagInfo.tagsLen; + for (int32_t i = 0; i < pInput->num; ++i) { + char *tmp = pInput->res + i* size; + TSKEY timestamp = ((UniqueUnit*)tmp)->timestamp; + char *data = tmp + sizeof(UniqueUnit); + char *tags = tmp + sizeof(UniqueUnit) + pCtx->inputBytes; + do_unique_function(pCtx, pOutput, timestamp, data, tags); + + if (sizeof(SUniqueFuncInfo) + pOutput->num * (sizeof(UniqueUnit) + pCtx->inputBytes + pCtx->tagInfo.tagsLen) >= MAX_UNIQUE_RESULT_SIZE){ + GET_RES_INFO(pCtx)->numOfRes = -1; // mark out of memory + return; + } + } + + GET_RES_INFO(pCtx)->numOfRes = pOutput->num; } static void unique_func_finalizer(SQLFunctionCtx *pCtx) { @@ -5328,7 +5274,6 @@ static void unique_func_finalizer(SQLFunctionCtx *pCtx) { doFinalizer(pCtx); } - ///////////////////////////////////////////////////////////////////////////////////////////// /* * function compatible list. diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index c18d515bc0..65acec8a20 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -356,7 +356,13 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO idata.info.bytes = pExpr[i].base.resBytes; idata.info.colId = pExpr[i].base.resColId; - int32_t size = MAX(idata.info.bytes * numOfRows, minSize); + int64_t tmp = idata.info.bytes; + tmp *= numOfRows; + if (tmp >= 1024*1024*1024) { // 1G + qError("size is too large, failed to allocate column buffer for output buffer"); + goto _clean; + } + int32_t size = MAX(tmp, minSize); idata.pData = calloc(1, size); // at least to hold a pointer on x64 platform if (idata.pData == NULL) { qError("failed to allocate column buffer for output buffer"); @@ -1003,10 +1009,10 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx } } - if (functionId == TSDB_FUNC_UNIQUE && GET_RES_INFO(&(pCtx[k]))->numOfRes > pQueryAttr->maxUniqueResult){ + if (functionId == TSDB_FUNC_UNIQUE && + (GET_RES_INFO(&(pCtx[k]))->numOfRes > MAX_UNIQUE_RESULT_ROWS || GET_RES_INFO(&(pCtx[k]))->numOfRes == -1)){ qError("Unique result num is too large. num: %d, limit: %d", - GET_RES_INFO(&(pCtx[k]))->numOfRes, pQueryAttr->maxUniqueResult); - aAggs[functionId].xFinalize(&pCtx[k]); + GET_RES_INFO(&(pCtx[k]))->numOfRes, MAX_UNIQUE_RESULT_ROWS); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_UNIQUE_RESULT_TOO_LARGE); } @@ -1271,10 +1277,10 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunction } SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; - if (functionId == TSDB_FUNC_UNIQUE && GET_RES_INFO(&(pCtx[k]))->numOfRes > pQueryAttr->maxUniqueResult){ + if (functionId == TSDB_FUNC_UNIQUE && + (GET_RES_INFO(&(pCtx[k]))->numOfRes > MAX_UNIQUE_RESULT_ROWS || GET_RES_INFO(&(pCtx[k]))->numOfRes == -1)){ qError("Unique result num is too large. num: %d, limit: %d", - GET_RES_INFO(&(pCtx[k]))->numOfRes, pQueryAttr->maxUniqueResult); - aAggs[functionId].xFinalize(&pCtx[k]); + GET_RES_INFO(&(pCtx[k]))->numOfRes, MAX_UNIQUE_RESULT_ROWS); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_UNIQUE_RESULT_TOO_LARGE); } } @@ -1976,9 +1982,6 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr pCtx->end.key = INT64_MIN; pCtx->startTs = INT64_MIN; - if (pCtx->functionId == TSDB_FUNC_UNIQUE){ - pCtx->maxUniqueResult = pQueryAttr->maxUniqueResult; - } pCtx->numOfParams = pSqlExpr->numOfParams; for (int32_t j = 0; j < pCtx->numOfParams; ++j) { int16_t type = pSqlExpr->param[j].nType; @@ -2028,6 +2031,8 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT; } else if (functionId == TSDB_FUNC_SCALAR_EXPR) { pCtx->param[1].pz = (char*) &pRuntimeEnv->sasArray[i]; + } else if (functionId == TSDB_FUNC_UNIQUE){ + pCtx->pUniqueSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); } } @@ -2052,6 +2057,9 @@ static void* destroySQLFunctionCtx(SQLFunctionCtx* pCtx, int32_t numOfOutput) { tVariantDestroy(&pCtx[i].tag); tfree(pCtx[i].tagInfo.pTagCtxList); + if (pCtx[i].functionId == TSDB_FUNC_UNIQUE){ + taosHashClear(pCtx[i].pUniqueSet); + } } tfree(pCtx); @@ -2771,15 +2779,7 @@ static void getIntermediateBufInfo(SQueryRuntimeEnv* pRuntimeEnv, int32_t* ps, i SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; int32_t MIN_ROWS_PER_PAGE = 4; - if (pQueryAttr->uniqueQuery) { - int64_t rowSize = pQueryAttr->resultRowSize; - while(rowSize*pQueryAttr->maxUniqueResult > 1024*1024*100){ - pQueryAttr->maxUniqueResult = pQueryAttr->maxUniqueResult >> 1u; - } - *rowsize = (int32_t)(rowSize*pQueryAttr->maxUniqueResult); - }else{ - *rowsize = (int32_t)(pQueryAttr->resultRowSize * getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)); - } + *rowsize = (int32_t)(pQueryAttr->resultRowSize * getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)); int32_t overhead = sizeof(tFilePage); // one page contains at least two rows @@ -8994,7 +8994,7 @@ static int32_t updateOutputBufForTopBotQuery(SQueriedTableInfo* pTableInfo, SCol for (int32_t i = 0; i < numOfOutput; ++i) { int16_t functId = pExprs[i].base.functionId; - if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM || functId == TSDB_FUNC_SAMPLE) { + if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM || functId == TSDB_FUNC_SAMPLE || funcIf == TSDB_FUNC_UNIQUE) { int32_t j = getColumnIndexInSource(pTableInfo, &pExprs[i].base, pTagCols); if (j < 0 || j >= pTableInfo->numOfCols) { return TSDB_CODE_QRY_INVALID_MSG; @@ -9580,7 +9580,6 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S pQueryAttr->pFilters = pFilters; pQueryAttr->range = pQueryMsg->range; pQueryAttr->uniqueQuery = isUniqueQuery(numOfOutput, pExprs); - pQueryAttr->maxUniqueResult = MAX_UNIQUE_RESULT_SIZE; pQueryAttr->tableCols = calloc(numOfCols, sizeof(SSingleColumnFilterInfo)); if (pQueryAttr->tableCols == NULL) { diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index 6afa111a9a..36e97f9fcf 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -45,7 +45,7 @@ int32_t getRowNumForMultioutput(SQueryAttr* pQueryAttr, bool topBottomQuery, boo } if (pQueryAttr->uniqueQuery){ - return pQueryAttr->maxUniqueResult; + return MAX_UNIQUE_RESULT_ROWS; } return 1; } -- GitLab