diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 3dfaae820e0be57947569fd46c99e53c3effb214..1f84fa27d7ccfc32337365295b80da873c1053f9 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -242,7 +242,7 @@ SExprInfo* tscExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnInde int16_t size, int16_t resColId, int16_t interSize, bool isTagCol); SExprInfo* tscExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type, - int16_t size); + int32_t size); size_t tscNumOfExprs(SQueryInfo* pQueryInfo); int32_t tscExprTopBottomIndex(SQueryInfo* pQueryInfo); diff --git a/src/client/src/tscGlobalmerge.c b/src/client/src/tscGlobalmerge.c index 02c5604ab427efca2227c90947c9ae80a84892fe..d01e1fcae3b4824959dced85f31b3cc252cda6c5 100644 --- a/src/client/src/tscGlobalmerge.c +++ b/src/client/src/tscGlobalmerge.c @@ -440,6 +440,15 @@ int32_t tscCreateGlobalMergerEnv(SQueryInfo *pQueryInfo, tExtMemBuffer ***pMemBu rlen += pExpr->base.resBytes; } + int32_t pg = DEFAULT_PAGE_SIZE; + int32_t overhead = sizeof(tFilePage); + while((pg - overhead) < rlen * 2) { + pg *= 2; + } + + if (*nBufferSizes < pg){ + *nBufferSizes = 2 * pg; + } int32_t capacity = 0; if (rlen != 0) { if ((*nBufferSizes) < rlen) { @@ -447,19 +456,13 @@ int32_t tscCreateGlobalMergerEnv(SQueryInfo *pQueryInfo, tExtMemBuffer ***pMemBu } capacity = (*nBufferSizes) / rlen; } - + pModel = createColumnModel(pSchema, (int32_t)size, capacity); tfree(pSchema); if (pModel == NULL){ return TSDB_CODE_TSC_OUT_OF_MEMORY; } - int32_t pg = DEFAULT_PAGE_SIZE; - int32_t overhead = sizeof(tFilePage); - while((pg - overhead) < pModel->rowSize * 2) { - pg *= 2; - } - assert(numOfSub <= pTableMetaInfo->vgroupList->numOfVgroups); for (int32_t i = 0; i < numOfSub; ++i) { (*pMemBuffer)[i] = createExtMemBuffer(*nBufferSizes, rlen, pg, pModel); @@ -593,7 +596,7 @@ static void setTagValueForMultipleRows(SQLFunctionCtx* pCtx, int32_t numOfOutput } } -static void doMergeResultImpl(SMultiwayMergeInfo* pInfo, SQLFunctionCtx *pCtx, int32_t numOfExpr, int32_t rowIndex, char** pDataPtr) { +static void doMergeResultImpl(SOperatorInfo* pInfo, SQLFunctionCtx *pCtx, int32_t numOfExpr, int32_t rowIndex, char** pDataPtr) { for (int32_t j = 0; j < numOfExpr; ++j) { pCtx[j].pInput = pDataPtr[j] + pCtx[j].inputBytes * rowIndex; } @@ -605,12 +608,19 @@ static void doMergeResultImpl(SMultiwayMergeInfo* pInfo, SQLFunctionCtx *pCtx, i } if (functionId < 0) { - SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1); + SUdfInfo* pUdfInfo = taosArrayGet(((SMultiwayMergeInfo*)(pInfo->info))->udfInfo, -1 * functionId - 1); doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE); } else { assert(!TSDB_FUNC_IS_SCALAR(functionId)); aAggs[functionId].mergeFunc(&pCtx[j]); } + + if (functionId == TSDB_FUNC_UNIQUE && + (GET_RES_INFO(&(pCtx[j]))->numOfRes > MAX_UNIQUE_RESULT_ROWS || GET_RES_INFO(&(pCtx[j]))->numOfRes == -1)){ + tscError("Unique result num is too large. num: %d, limit: %d", + GET_RES_INFO(&(pCtx[j]))->numOfRes, MAX_UNIQUE_RESULT_ROWS); + longjmp(pInfo->pRuntimeEnv->env, TSDB_CODE_QRY_UNIQUE_RESULT_TOO_LARGE); + } } } @@ -644,7 +654,7 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD for(int32_t i = 0; i < pBlock->info.rows; ++i) { if (pInfo->hasPrev) { if (needToMerge(pBlock, pInfo->orderColumnList, i, pInfo->prevRow)) { - doMergeResultImpl(pInfo, pCtx, numOfExpr, i, addrPtr); + doMergeResultImpl(pOperator, pCtx, numOfExpr, i, addrPtr); } else { doFinalizeResultImpl(pInfo, pCtx, numOfExpr); @@ -656,7 +666,7 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD for(int32_t j = 0; j < numOfExpr; ++j) { pCtx[j].pOutput += (pCtx[j].outputBytes * numOfRows); if (pCtx[j].functionId == TSDB_FUNC_TOP || pCtx[j].functionId == TSDB_FUNC_BOTTOM || - pCtx[j].functionId == TSDB_FUNC_SAMPLE) { + pCtx[j].functionId == TSDB_FUNC_SAMPLE || pCtx[j].functionId == TSDB_FUNC_UNIQUE) { if(j > 0) pCtx[j].ptsOutputBuf = pCtx[j - 1].pOutput; } } @@ -671,10 +681,10 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD } } - doMergeResultImpl(pInfo, pCtx, numOfExpr, i, addrPtr); + doMergeResultImpl(pOperator, pCtx, numOfExpr, i, addrPtr); } } else { - doMergeResultImpl(pInfo, pCtx, numOfExpr, i, addrPtr); + doMergeResultImpl(pOperator, pCtx, numOfExpr, i, addrPtr); } savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, i, &pInfo->hasPrev); diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index b18b76c9672e6bebf02a4d2fa064b16192f37630..5e61d8e0c1b8e23976933260f84efdb168099642 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1065,13 +1065,14 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { * if the top/bottom exists, only tags columns, tbname column, and primary timestamp column * are available. */ -static bool isTopBottomQuery(SQueryInfo* pQueryInfo) { +static bool isTopBottomUniqueQuery(SQueryInfo* pQueryInfo) { size_t size = tscNumOfExprs(pQueryInfo); for (int32_t i = 0; i < size; ++i) { int32_t functionId = tscExprGet(pQueryInfo, i)->base.functionId; - if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) { + if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM + || functionId == TSDB_FUNC_UNIQUE) { return true; } } @@ -1112,20 +1113,7 @@ static int32_t addPrimaryTsColumnForTimeWindowQuery(SQueryInfo* pQueryInfo, SSql static int32_t checkInvalidExprForTimeWindow(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) { const char* msg1 = "invalid query expression"; const char* msg2 = "top/bottom query does not support order by value in time window query"; - - // for top/bottom + interval query, we do not add additional timestamp column in the front - if (isTopBottomQuery(pQueryInfo)) { - - // invalid sql: - // top(col, k) from table_name [interval(1d)|session(ts, 1d)] order by k asc - // order by normal column is not supported - int32_t colId = pQueryInfo->order.orderColId; - if (isTimeWindowQuery(pQueryInfo) && colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); - } - - return TSDB_CODE_SUCCESS; - } + const char* msg3 = "unique function does not supportted in time window query"; /* * invalid sql: @@ -1137,6 +1125,9 @@ static int32_t checkInvalidExprForTimeWindow(SSqlCmd* pCmd, SQueryInfo* pQueryIn if (pExpr->base.functionId == TSDB_FUNC_COUNT && TSDB_COL_IS_TAG(pExpr->base.colInfo.flag)) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); } + if (pExpr->base.functionId == TSDB_FUNC_UNIQUE) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3); + } } /* @@ -1147,6 +1138,20 @@ static int32_t checkInvalidExprForTimeWindow(SSqlCmd* pCmd, SQueryInfo* pQueryIn return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); } + // for top/bottom + interval query, we do not add additional timestamp column in the front + if (isTopBottomUniqueQuery(pQueryInfo)) { + + // invalid sql: + // top(col, k) from table_name [interval(1d)|session(ts, 1d)] order by k asc + // order by normal column is not supported + int32_t colId = pQueryInfo->order.orderColId; + if (isTimeWindowQuery(pQueryInfo) && colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); + } + + return TSDB_CODE_SUCCESS; + } + return addPrimaryTsColumnForTimeWindowQuery(pQueryInfo, pCmd); } @@ -1225,7 +1230,7 @@ static int32_t validateStateWindowNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS const char* msg1 = "invalid column name"; const char* msg2 = "invalid column type"; const char* msg3 = "not support state_window with group by "; - const char* msg4 = "function not support for super table query"; + const char* msg4 = "state_window not support for super table query"; const char* msg5 = "not support state_window on tag column"; const char* msg6 = "function not support for state_window"; @@ -2658,7 +2663,7 @@ static UNUSED_FUNC void updateFunctionInterBuf(SQueryInfo* pQueryInfo, bool supe int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t colIndex, tSqlExprItem* pItem, bool finalResult, SUdfInfo* pUdfInfo) { STableMetaInfo* pTableMetaInfo = NULL; - int32_t functionId = pItem->pNode->functionId; + int32_t functionId = pItem->pNode->functionId; const char* msg1 = "unsupported column types"; const char* msg2 = "invalid parameters"; @@ -2688,7 +2693,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col const char* msg26 = "start param cannot be 0 with 'log_bin'"; const char* msg27 = "factor param cannot be negative or equal to 0/1"; const char* msg28 = "the second paramter of diff should be 0 or 1"; - + const char* msg29 = "key timestamp column cannot be used to unique function"; switch (functionId) { case TSDB_FUNC_COUNT: { @@ -2697,13 +2702,13 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); } - SExprInfo* pExpr = NULL; + SExprInfo* pExpr = NULL; SColumnIndex index = COLUMN_INDEX_INITIALIZER; if (pItem->pNode->Expr.paramList != NULL) { tSqlExprItem* pParamElem = taosArrayGet(pItem->pNode->Expr.paramList, 0); - SStrToken* pToken = &pParamElem->pNode->columnName; - int16_t tokenId = pParamElem->pNode->tokenId; + SStrToken* pToken = &pParamElem->pNode->columnName; + int16_t tokenId = pParamElem->pNode->tokenId; if ((pToken->z == NULL || pToken->n == 0) && (TK_INTEGER != tokenId)) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3); } @@ -2719,7 +2724,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col index = (SColumnIndex){0, PRIMARYKEY_TIMESTAMP_COL_INDEX}; int32_t size = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes; - pExpr = tscExprAppend(pQueryInfo, functionId, &index, TSDB_DATA_TYPE_BIGINT, size, getNewResColId(pCmd), size, false); + pExpr = tscExprAppend(pQueryInfo, functionId, &index, TSDB_DATA_TYPE_BIGINT, size, getNewResColId(pCmd), size, + false); } else { // count the number of table created according to the super table if (getColumnIndexByName(pToken, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) { @@ -2730,34 +2736,38 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col // count tag is equalled to count(tbname) bool isTag = false; - if (index.columnIndex >= tscGetNumOfColumns(pTableMetaInfo->pTableMeta) || index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) { + if (index.columnIndex >= tscGetNumOfColumns(pTableMetaInfo->pTableMeta) || + index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) { index.columnIndex = TSDB_TBNAME_COLUMN_INDEX; isTag = true; } int32_t size = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes; - pExpr = tscExprAppend(pQueryInfo, functionId, &index, TSDB_DATA_TYPE_BIGINT, size, getNewResColId(pCmd), size, isTag); + pExpr = tscExprAppend(pQueryInfo, functionId, &index, TSDB_DATA_TYPE_BIGINT, size, getNewResColId(pCmd), size, + isTag); } } else { // count(*) is equalled to count(primary_timestamp_key) index = (SColumnIndex){0, PRIMARYKEY_TIMESTAMP_COL_INDEX}; int32_t size = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes; - pExpr = tscExprAppend(pQueryInfo, functionId, &index, TSDB_DATA_TYPE_BIGINT, size, getNewResColId(pCmd), size, false); + pExpr = tscExprAppend(pQueryInfo, functionId, &index, TSDB_DATA_TYPE_BIGINT, size, getNewResColId(pCmd), size, + false); } pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); memset(pExpr->base.aliasName, 0, tListLen(pExpr->base.aliasName)); - getColumnName(pItem, pExpr->base.aliasName, pExpr->base.token,sizeof(pExpr->base.aliasName) - 1); + getColumnName(pItem, pExpr->base.aliasName, pExpr->base.token, sizeof(pExpr->base.aliasName) - 1); SColumnList list = createColumnList(1, index.tableIndex, index.columnIndex); if (finalResult) { int32_t numOfOutput = tscNumOfFields(pQueryInfo); - insertResultField(pQueryInfo, numOfOutput, &list, sizeof(int64_t), TSDB_DATA_TYPE_BIGINT, pExpr->base.aliasName, pExpr); + insertResultField(pQueryInfo, numOfOutput, &list, sizeof(int64_t), TSDB_DATA_TYPE_BIGINT, pExpr->base.aliasName, + pExpr); } else { for (int32_t i = 0; i < list.num; ++i) { SSchema* ps = tscGetTableSchema(pTableMetaInfo->pTableMeta); tscColumnListInsert(pQueryInfo->colList, list.ids[i].columnIndex, pTableMetaInfo->pTableMeta->id.uid, - &ps[list.ids[i].columnIndex]); + &ps[list.ids[i].columnIndex]); } } @@ -2783,12 +2793,13 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col case TSDB_FUNC_LEASTSQR: case TSDB_FUNC_ELAPSED: { // 1. valid the number of parameters - int32_t numOfParams = (pItem->pNode->Expr.paramList == NULL)? 0: (int32_t) taosArrayGetSize(pItem->pNode->Expr.paramList); + int32_t numOfParams = + (pItem->pNode->Expr.paramList == NULL) ? 0 : (int32_t)taosArrayGetSize(pItem->pNode->Expr.paramList); // no parameters or more than one parameter for function if (pItem->pNode->Expr.paramList == NULL || - (functionId != TSDB_FUNC_LEASTSQR && functionId != TSDB_FUNC_DERIVATIVE && functionId != TSDB_FUNC_ELAPSED && functionId != TSDB_FUNC_DIFF - && numOfParams != 1) || + (functionId != TSDB_FUNC_LEASTSQR && functionId != TSDB_FUNC_DERIVATIVE && functionId != TSDB_FUNC_ELAPSED && + functionId != TSDB_FUNC_DIFF && numOfParams != 1) || ((functionId == TSDB_FUNC_LEASTSQR || functionId == TSDB_FUNC_DERIVATIVE) && numOfParams != 3) || (functionId == TSDB_FUNC_ELAPSED && numOfParams != 1 && numOfParams != 2) || (functionId == TSDB_FUNC_DIFF && numOfParams != 1 && numOfParams != 2)) { @@ -2796,12 +2807,14 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col } tSqlExprItem* pParamElem = taosArrayGet(pItem->pNode->Expr.paramList, 0); - if ((pParamElem->pNode->tokenId != TK_ALL && pParamElem->pNode->tokenId != TK_ID) || 0 == pParamElem->pNode->columnName.n) { + if ((pParamElem->pNode->tokenId != TK_ALL && pParamElem->pNode->tokenId != TK_ID) || + 0 == pParamElem->pNode->columnName.n) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); } SColumnIndex index = COLUMN_INDEX_INITIALIZER; - if ((getColumnIndexByName(&pParamElem->pNode->columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS)) { + if ((getColumnIndexByName(&pParamElem->pNode->columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != + TSDB_CODE_SUCCESS)) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3); } @@ -2810,13 +2823,14 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col // elapsed only can be applied to primary key if (functionId == TSDB_FUNC_ELAPSED) { - if ( index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX || pColumnSchema->colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) { + if (index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX || + pColumnSchema->colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), "elapsed only can be applied to primary key"); } } - //for timeline related aggregation function like elapsed and twa, groupby in subquery is not allowed - //as calculation result is meaningless by mixing different childtables(timelines) results. + // for timeline related aggregation function like elapsed and twa, groupby in subquery is not allowed + // as calculation result is meaningless by mixing different childtables(timelines) results. if ((functionId == TSDB_FUNC_ELAPSED || functionId == TSDB_FUNC_TWA) && pQueryInfo->pUpstream != NULL) { size_t numOfUpstreams = taosArrayGetSize(pQueryInfo->pUpstream); for (int32_t i = 0; i < numOfUpstreams; ++i) { @@ -2830,7 +2844,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col STableComInfo info = tscGetTableInfo(pTableMetaInfo->pTableMeta); // functions can not be applied to tags - if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX || (index.columnIndex >= tscGetNumOfColumns(pTableMetaInfo->pTableMeta))) { + if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX || + (index.columnIndex >= tscGetNumOfColumns(pTableMetaInfo->pTableMeta))) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6); } @@ -2839,7 +2854,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col if (!IS_NUMERIC_TYPE(pSchema->type) && (functionId != TSDB_FUNC_ELAPSED)) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); - } else if (IS_UNSIGNED_NUMERIC_TYPE(pSchema->type) && (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE)) { + } else if (IS_UNSIGNED_NUMERIC_TYPE(pSchema->type) && + (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE)) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg9); } @@ -2856,16 +2872,18 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE || functionId == TSDB_FUNC_CSUM) { SColumnIndex indexTS = {.tableIndex = index.tableIndex, .columnIndex = 0}; SExprInfo* pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &indexTS, TSDB_DATA_TYPE_TIMESTAMP, - TSDB_KEYSIZE, 0, TSDB_KEYSIZE, false); + TSDB_KEYSIZE, 0, TSDB_KEYSIZE, false); tstrncpy(pExpr->base.aliasName, aAggs[TSDB_FUNC_TS_DUMMY].name, sizeof(pExpr->base.aliasName)); SColumnList ids = createColumnList(1, 0, 0); - insertResultField(pQueryInfo, colIndex, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[TSDB_FUNC_TS_DUMMY].name, pExpr); + insertResultField(pQueryInfo, colIndex, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, + aAggs[TSDB_FUNC_TS_DUMMY].name, pExpr); } - SExprInfo* pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), intermediateResSize, false); + SExprInfo* pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), + intermediateResSize, false); - if (functionId == TSDB_FUNC_LEASTSQR) { // set the leastsquares parameters + if (functionId == TSDB_FUNC_LEASTSQR) { // set the leastsquares parameters char val[8] = {0}; if (tVariantDump(&pParamElem[1].pNode->value, val, TSDB_DATA_TYPE_DOUBLE, true) < 0) { return TSDB_CODE_TSC_INVALID_OPERATION; @@ -2886,14 +2904,15 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col char val[8] = {0}; int64_t tickPerSec = 0; - char *exprToken = tcalloc(pParamElem[1].pNode->exprToken.n + 1, sizeof(char)); + char* exprToken = tcalloc(pParamElem[1].pNode->exprToken.n + 1, sizeof(char)); memcpy(exprToken, pParamElem[1].pNode->exprToken.z, pParamElem[1].pNode->exprToken.n); if (pParamElem[1].pNode->exprToken.type == TK_NOW || strstr(exprToken, "now")) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); } tfree(exprToken); - if ((TSDB_DATA_TYPE_NULL == pParamElem[1].pNode->value.nType) || tVariantDump(&pParamElem[1].pNode->value, (char*) &tickPerSec, TSDB_DATA_TYPE_BIGINT, true) < 0) { + if ((TSDB_DATA_TYPE_NULL == pParamElem[1].pNode->value.nType) || + tVariantDump(&pParamElem[1].pNode->value, (char*)&tickPerSec, TSDB_DATA_TYPE_BIGINT, true) < 0) { return TSDB_CODE_TSC_INVALID_OPERATION; } @@ -2909,7 +2928,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg16); } - tscExprAddParams(&pExpr->base, (char*) &tickPerSec, TSDB_DATA_TYPE_BIGINT, LONG_BYTES); + tscExprAddParams(&pExpr->base, (char*)&tickPerSec, TSDB_DATA_TYPE_BIGINT, LONG_BYTES); if (functionId == TSDB_FUNC_DERIVATIVE) { memset(val, 0, tListLen(val)); @@ -2917,7 +2936,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col return TSDB_CODE_TSC_INVALID_OPERATION; } - int64_t v = *(int64_t*) val; + int64_t v = *(int64_t*)val; if (v != 0 && v != 1) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg11); } @@ -2944,7 +2963,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col SColumnList ids = createColumnList(1, index.tableIndex, index.columnIndex); memset(pExpr->base.aliasName, 0, tListLen(pExpr->base.aliasName)); - getColumnName(pItem, pExpr->base.aliasName, pExpr->base.token,sizeof(pExpr->base.aliasName) - 1); + getColumnName(pItem, pExpr->base.aliasName, pExpr->base.token, sizeof(pExpr->base.aliasName) - 1); if (finalResult) { int32_t numOfOutput = tscNumOfFields(pQueryInfo); @@ -2968,9 +2987,9 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col // NOTE: has time range condition or normal column filter condition, the last_row query will be transferred to last query SConvertFunc cvtFunc = {.originFuncId = functionId, .execFuncId = functionId}; - if (functionId == TSDB_FUNC_LAST_ROW && ((!TSWINDOW_IS_EQUAL(pQueryInfo->window, TSWINDOW_INITIALIZER)) || - (hasNormalColumnFilter(pQueryInfo)) || - taosArrayGetSize(pQueryInfo->pUpstream)>0)) { + if (functionId == TSDB_FUNC_LAST_ROW && + ((!TSWINDOW_IS_EQUAL(pQueryInfo->window, TSWINDOW_INITIALIZER)) || (hasNormalColumnFilter(pQueryInfo)) || + taosArrayGetSize(pQueryInfo->pUpstream) > 0)) { cvtFunc.execFuncId = TSDB_FUNC_LAST; } @@ -2979,7 +2998,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3); } - if (taosArrayGetSize(pItem->pNode->Expr.paramList) > 1 && (pItem->aliasName != NULL && strlen(pItem->aliasName) > 0)) { + if (taosArrayGetSize(pItem->pNode->Expr.paramList) > 1 && + (pItem->aliasName != NULL && strlen(pItem->aliasName) > 0)) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg8); } @@ -2992,7 +3012,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col SColumnIndex index = COLUMN_INDEX_INITIALIZER; - if (pParamElem->pNode->tokenId == TK_ALL) { // select table.* + if (pParamElem->pNode->tokenId == TK_ALL) { // select table.* SStrToken tmpToken = pParamElem->pNode->columnName; if (getTableIndexByName(&tmpToken, pQueryInfo, &index) != TSDB_CODE_SUCCESS) { @@ -3008,14 +3028,15 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col SStrToken t = {.z = pSchema[j].name, .n = (uint32_t)strnlen(pSchema[j].name, TSDB_COL_NAME_LEN)}; setResultColName(name, pItem, cvtFunc.originFuncId, &t, true); - if (setExprInfoForFunctions(pCmd, pQueryInfo, &pSchema[j], cvtFunc, name, colIndex++, &index, - finalResult, pUdfInfo) != 0) { + if (setExprInfoForFunctions(pCmd, pQueryInfo, &pSchema[j], cvtFunc, name, colIndex++, &index, finalResult, + pUdfInfo) != 0) { return TSDB_CODE_TSC_INVALID_OPERATION; } } } else { - if (getColumnIndexByName(&pParamElem->pNode->columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) { + if (getColumnIndexByName(&pParamElem->pNode->columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != + TSDB_CODE_SUCCESS) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3); } @@ -3030,13 +3051,14 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6); } - char name[TSDB_COL_NAME_LEN] = {0}; + char name[TSDB_COL_NAME_LEN] = {0}; SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex); bool multiColOutput = taosArrayGetSize(pItem->pNode->Expr.paramList) > 1; setResultColName(name, pItem, cvtFunc.originFuncId, &pParamElem->pNode->columnName, multiColOutput); - if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, name, colIndex++, &index, finalResult, pUdfInfo) != 0) { + if (setExprInfoForFunctions(pCmd, pQueryInfo, pSchema, cvtFunc, name, colIndex++, &index, finalResult, + pUdfInfo) != 0) { return TSDB_CODE_TSC_INVALID_OPERATION; } } @@ -3057,12 +3079,12 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col for (int32_t i = 0; i < tscGetNumOfColumns(pTableMetaInfo->pTableMeta); ++i) { SColumnIndex index = {.tableIndex = j, .columnIndex = i}; - char name[TSDB_COL_NAME_LEN] = {0}; + char name[TSDB_COL_NAME_LEN] = {0}; SStrToken t = {.z = pSchema[i].name, .n = (uint32_t)strnlen(pSchema[i].name, TSDB_COL_NAME_LEN)}; setResultColName(name, pItem, cvtFunc.originFuncId, &t, true); if (setExprInfoForFunctions(pCmd, pQueryInfo, &pSchema[index.columnIndex], cvtFunc, name, colIndex, &index, - finalResult, pUdfInfo) != 0) { + finalResult, pUdfInfo) != 0) { return TSDB_CODE_TSC_INVALID_OPERATION; } colIndex++; @@ -3079,18 +3101,21 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col case TSDB_FUNC_MAVG: case TSDB_FUNC_SAMPLE: case TSDB_FUNC_PERCT: - case TSDB_FUNC_APERCT: { + case TSDB_FUNC_APERCT: + case TSDB_FUNC_UNIQUE: { // 1. valid the number of parameters bool valid = true; - if(pItem->pNode->Expr.paramList == NULL) { + if (pItem->pNode->Expr.paramList == NULL) { valid = false; - } else if(functionId == TSDB_FUNC_APERCT) { + } else if (functionId == TSDB_FUNC_APERCT) { size_t cnt = taosArrayGetSize(pItem->pNode->Expr.paramList); - if(cnt != 2 && cnt !=3) valid = false; - } else { + if (cnt != 2 && cnt != 3) valid = false; + } else if (functionId == TSDB_FUNC_UNIQUE) { + if (taosArrayGetSize(pItem->pNode->Expr.paramList) != 1) valid = false; + }else { if (taosArrayGetSize(pItem->pNode->Expr.paramList) != 2) valid = false; } - if(!valid) { + if (!valid) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); } @@ -3098,16 +3123,20 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col if (pParamElem->pNode->tokenId != TK_ID) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); } - + SColumnIndex index = COLUMN_INDEX_INITIALIZER; - if (getColumnIndexByName(&pParamElem->pNode->columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) { + if (getColumnIndexByName(&pParamElem->pNode->columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != + TSDB_CODE_SUCCESS) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3); } if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6); } - + if (index.columnIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX && functionId == TSDB_FUNC_UNIQUE) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg29); + } + pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex); @@ -3117,27 +3146,30 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col } // 2. valid the column type - if (functionId != TSDB_FUNC_SAMPLE && !IS_NUMERIC_TYPE(pSchema->type)) { + if (functionId != TSDB_FUNC_SAMPLE && functionId != TSDB_FUNC_UNIQUE && !IS_NUMERIC_TYPE(pSchema->type)) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); } - // 3. valid the parameters - if (pParamElem[1].pNode->tokenId == TK_ID) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); - } + tVariant* pVariant = NULL; + if (functionId != TSDB_FUNC_UNIQUE) { + // 3. valid the parameters + if (pParamElem[1].pNode->tokenId == TK_ID) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); + } - tVariant* pVariant = &pParamElem[1].pNode->value; + pVariant = &pParamElem[1].pNode->value; + } - int16_t resultType = pSchema->type; - int32_t resultSize = pSchema->bytes; - int32_t interResult = 0; + int16_t resultType = pSchema->type; + int32_t resultSize = pSchema->bytes; + int32_t interResult = 0; char val[8] = {0}; SExprInfo* pExpr = NULL; if (functionId == TSDB_FUNC_PERCT || functionId == TSDB_FUNC_APERCT) { - // param1 double - if(pVariant->nType != TSDB_DATA_TYPE_DOUBLE && pVariant->nType != TSDB_DATA_TYPE_BIGINT){ + // param1 double + if (pVariant->nType != TSDB_DATA_TYPE_DOUBLE && pVariant->nType != TSDB_DATA_TYPE_BIGINT) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5); } tVariantDump(pVariant, val, TSDB_DATA_TYPE_DOUBLE, true); @@ -3147,18 +3179,19 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5); } - getResultDataInfo(pSchema->type, pSchema->bytes, functionId, 0, &resultType, &resultSize, &interResult, 0, false, - pUdfInfo); + getResultDataInfo(pSchema->type, pSchema->bytes, functionId, 0, &resultType, &resultSize, &interResult, 0, + false, pUdfInfo); /* - * sql function transformation - * for dp = 0, it is actually min, - * for dp = 100, it is max, + * sql function transformation + * for dp = 0, it is actually min, + * for dp = 100, it is max, */ tscInsertPrimaryTsSourceColumn(pQueryInfo, pTableMetaInfo->pTableMeta->id.uid); colIndex += 1; // the first column is ts - - pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), interResult, false); + + pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), + interResult, false); tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_DOUBLE, sizeof(double)); // param2 int32 @@ -3166,15 +3199,15 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col if (pParamElem[2].pNode != NULL) { pVariant = &pParamElem[2].pNode->value; // check type must string - if(pVariant->nType != TSDB_DATA_TYPE_BINARY || pVariant->pz == NULL){ + if (pVariant->nType != TSDB_DATA_TYPE_BINARY || pVariant->pz == NULL) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg13); } - char* pzAlgo = pVariant->pz; + char* pzAlgo = pVariant->pz; int32_t algo = 0; - if(strcasecmp(pzAlgo, "t-digest") == 0) { + if (strcasecmp(pzAlgo, "t-digest") == 0) { algo = 1; - } else if(strcasecmp(pzAlgo, "default") == 0){ + } else if (strcasecmp(pzAlgo, "default") == 0) { algo = 0; } else { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg14); @@ -3190,7 +3223,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col char* endptr = NULL; strtoll(pParamElem[1].pNode->exprToken.z, &endptr, 10); - if ((endptr-pParamElem[1].pNode->exprToken.z != pParamElem[1].pNode->exprToken.n) || errno == ERANGE) { + if ((endptr - pParamElem[1].pNode->exprToken.z != pParamElem[1].pNode->exprToken.n) || errno == ERANGE) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg18); } tVariantDump(pVariant, val, TSDB_DATA_TYPE_BIGINT, true); @@ -3202,36 +3235,38 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col // todo REFACTOR // set the first column ts for top/bottom query - int32_t tsFuncId = (functionId == TSDB_FUNC_MAVG) ? TSDB_FUNC_TS_DUMMY : TSDB_FUNC_TS; + int32_t tsFuncId = (functionId == TSDB_FUNC_MAVG) ? TSDB_FUNC_TS_DUMMY : TSDB_FUNC_TS; SColumnIndex index1 = {index.tableIndex, PRIMARYKEY_TIMESTAMP_COL_INDEX}; - pExpr = tscExprAppend(pQueryInfo, tsFuncId, &index1, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, 0, - 0, false); + pExpr = tscExprAppend(pQueryInfo, tsFuncId, &index1, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, 0, 0, false); tstrncpy(pExpr->base.aliasName, aAggs[tsFuncId].name, sizeof(pExpr->base.aliasName)); const int32_t TS_COLUMN_INDEX = PRIMARYKEY_TIMESTAMP_COL_INDEX; SColumnList ids = createColumnList(1, index.tableIndex, TS_COLUMN_INDEX); - insertResultField(pQueryInfo, colIndex, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, - aAggs[tsFuncId].name, pExpr); + insertResultField(pQueryInfo, colIndex, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[tsFuncId].name, + pExpr); colIndex += 1; // the first column is ts - getResultDataInfo(pSchema->type, pSchema->bytes, functionId, (int32_t)numRowsSelected, &resultType, &resultSize, &interResult, 0, false, - pUdfInfo); - pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), interResult, false); + getResultDataInfo(pSchema->type, pSchema->bytes, functionId, (int32_t)numRowsSelected, &resultType, + &resultSize, &interResult, 0, false, pUdfInfo); + pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), + interResult, false); tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t)); } else { tVariantDump(pVariant, val, TSDB_DATA_TYPE_BIGINT, true); int64_t numRowsSelected = GET_INT32_VAL(val); - if (numRowsSelected <= 0 || numRowsSelected > 100) { // todo use macro + if (functionId != TSDB_FUNC_UNIQUE && (numRowsSelected <= 0 || numRowsSelected > 100)) { // todo use macro return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg12); } + if(functionId == TSDB_FUNC_UNIQUE){ + GET_INT32_VAL(val) = MAX_UNIQUE_RESULT_ROWS; + } // todo REFACTOR // set the first column ts for top/bottom query SColumnIndex index1 = {index.tableIndex, PRIMARYKEY_TIMESTAMP_COL_INDEX}; - pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS, &index1, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, 0, - 0, false); + pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS, &index1, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE, 0, 0, false); tstrncpy(pExpr->base.aliasName, aAggs[TSDB_FUNC_TS].name, sizeof(pExpr->base.aliasName)); const int32_t TS_COLUMN_INDEX = PRIMARYKEY_TIMESTAMP_COL_INDEX; @@ -3241,12 +3276,13 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col colIndex += 1; // the first column is ts - pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), resultSize, false); + pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), + resultSize, false); tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_BIGINT, sizeof(int64_t)); } - + memset(pExpr->base.aliasName, 0, tListLen(pExpr->base.aliasName)); - getColumnName(pItem, pExpr->base.aliasName, pExpr->base.token,sizeof(pExpr->base.aliasName) - 1); + getColumnName(pItem, pExpr->base.aliasName, pExpr->base.token, sizeof(pExpr->base.aliasName) - 1); // todo refactor: tscColumnListInsert part SColumnList ids = createColumnList(1, index.tableIndex, index.columnIndex); @@ -3260,39 +3296,40 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col return TSDB_CODE_SUCCESS; } - + case TSDB_FUNC_TID_TAG: { pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7); } - + // no parameters or more than one parameter for function if (pItem->pNode->Expr.paramList == NULL || taosArrayGetSize(pItem->pNode->Expr.paramList) != 1) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); } - + tSqlExprItem* pParamItem = taosArrayGet(pItem->pNode->Expr.paramList, 0); - tSqlExpr* pParam = pParamItem->pNode; + tSqlExpr* pParam = pParamItem->pNode; SColumnIndex index = COLUMN_INDEX_INITIALIZER; - if (getColumnIndexByName(&pParam->columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) { + if (getColumnIndexByName(&pParam->columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != + TSDB_CODE_SUCCESS) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3); } - + pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta); - + // functions can not be applied to normal columns int32_t numOfCols = tscGetNumOfColumns(pTableMetaInfo->pTableMeta); if (index.columnIndex < numOfCols && index.columnIndex != TSDB_TBNAME_COLUMN_INDEX) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6); } - + if (index.columnIndex > 0) { index.columnIndex -= numOfCols; } - + // 2. valid the column type int16_t colType = 0; if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) { @@ -3300,7 +3337,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col } else { colType = pSchema[index.columnIndex].type; } - + if (colType == TSDB_DATA_TYPE_BOOL) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); } @@ -3315,20 +3352,20 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col } else { s = pTagSchema[index.columnIndex]; } - + int32_t bytes = 0; - int16_t type = 0; + int16_t type = 0; int32_t inter = 0; int32_t ret = getResultDataInfo(s.type, s.bytes, TSDB_FUNC_TID_TAG, 0, &type, &bytes, &inter, 0, 0, NULL); assert(ret == TSDB_CODE_SUCCESS); - + s.type = (uint8_t)type; s.bytes = bytes; TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY); tscAddFuncInSelectClause(pQueryInfo, 0, TSDB_FUNC_TID_TAG, &index, &s, TSDB_COL_TAG, getNewResColId(pCmd)); - + return TSDB_CODE_SUCCESS; } @@ -3338,12 +3375,15 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); } - SColumnIndex index = {.tableIndex = 0, .columnIndex = 0,}; + SColumnIndex index = { + .tableIndex = 0, + .columnIndex = 0, + }; pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); - int32_t inter = 0; + int32_t inter = 0; int16_t resType = 0; - int32_t bytes = 0; + int32_t bytes = 0; getResultDataInfo(TSDB_DATA_TYPE_INT, 4, TSDB_FUNC_BLKINFO, 0, &resType, &bytes, &inter, 0, 0, NULL); @@ -3375,7 +3415,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col } SColumnIndex index = COLUMN_INDEX_INITIALIZER; - if (getColumnIndexByName(&pParamElem->pNode->columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) { + if (getColumnIndexByName(&pParamElem->pNode->columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != + TSDB_CODE_SUCCESS) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3); } @@ -3390,20 +3431,19 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); } - //bin_type param + // bin_type param if (pParamElem[1].pNode->tokenId == TK_ID) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); } - tVariant *pVariant = &pParamElem[1].pNode->value; + tVariant* pVariant = &pParamElem[1].pNode->value; if (pVariant == NULL || pVariant->nType != TSDB_DATA_TYPE_BINARY) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); } -#define USER_INPUT_BIN 0 -#define LINEAR_BIN 1 -#define LOG_BIN 2 - + #define USER_INPUT_BIN 0 + #define LINEAR_BIN 1 + #define LOG_BIN 2 int8_t binType; if (strcasecmp(pVariant->pz, "user_input") == 0) { binType = USER_INPUT_BIN; @@ -3414,7 +3454,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col } else { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg20); } - //bin_description param in JSON format + // bin_description param in JSON format if (pParamElem[2].pNode->tokenId == TK_ID) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); } @@ -3424,11 +3464,11 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); } - cJSON *binDesc = cJSON_Parse(pVariant->pz); + cJSON* binDesc = cJSON_Parse(pVariant->pz); int32_t counter; int32_t numBins; int32_t numOutput; - double *intervals; + double* intervals; if (cJSON_IsObject(binDesc)) { /* linaer/log bins */ int32_t numOfParams = cJSON_GetArraySize(binDesc); int32_t startIndex; @@ -3436,11 +3476,11 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg22); } - cJSON *start = cJSON_GetObjectItem(binDesc, "start"); - cJSON *factor = cJSON_GetObjectItem(binDesc, "factor"); - cJSON *width = cJSON_GetObjectItem(binDesc, "width"); - cJSON *count = cJSON_GetObjectItem(binDesc, "count"); - cJSON *infinity = cJSON_GetObjectItem(binDesc, "infinity"); + cJSON* start = cJSON_GetObjectItem(binDesc, "start"); + cJSON* factor = cJSON_GetObjectItem(binDesc, "factor"); + cJSON* width = cJSON_GetObjectItem(binDesc, "width"); + cJSON* count = cJSON_GetObjectItem(binDesc, "count"); + cJSON* infinity = cJSON_GetObjectItem(binDesc, "infinity"); if (!cJSON_IsNumber(start) || !cJSON_IsNumber(count) || !cJSON_IsBool(infinity)) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg22); @@ -3450,10 +3490,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg25); } - if (isinf(start->valuedouble) || - (width != NULL && isinf(width->valuedouble)) || - (factor != NULL && isinf(factor->valuedouble)) || - (count != NULL && isinf(count->valuedouble))) { + if (isinf(start->valuedouble) || (width != NULL && isinf(width->valuedouble)) || + (factor != NULL && isinf(factor->valuedouble)) || (count != NULL && isinf(count->valuedouble))) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg23); } @@ -3468,7 +3506,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col intervals = tcalloc(numBins, sizeof(double)); if (cJSON_IsNumber(width) && factor == NULL && binType == LINEAR_BIN) { - //linear bin process + // linear bin process if (width->valuedouble == 0) { tfree(intervals); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg24); @@ -3482,7 +3520,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col startIndex++; } } else if (cJSON_IsNumber(factor) && width == NULL && binType == LOG_BIN) { - //log bin process + // log bin process if (start->valuedouble == 0) { tfree(intervals); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg26); @@ -3511,7 +3549,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col tfree(intervals); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg23); } - //in case of desc bin orders, -inf/inf should be swapped + // in case of desc bin orders, -inf/inf should be swapped assert(numBins >= 4); if (intervals[1] > intervals[numBins - 2]) { SWAP(intervals[0], intervals[numBins - 1], double); @@ -3524,7 +3562,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col } counter = numBins = cJSON_GetArraySize(binDesc); intervals = tcalloc(numBins, sizeof(double)); - cJSON *bin = binDesc->child; + cJSON* bin = binDesc->child; if (bin == NULL) { tfree(intervals); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg22); @@ -3550,16 +3588,17 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col int16_t resultType = pSchema->type; int32_t resultSize = pSchema->bytes; int32_t interResult = 0; - getResultDataInfo(pSchema->type, pSchema->bytes, functionId, counter, &resultType, &resultSize, &interResult, 0, false, - pUdfInfo); + getResultDataInfo(pSchema->type, pSchema->bytes, functionId, counter, &resultType, &resultSize, &interResult, 0, + false, pUdfInfo); SExprInfo* pExpr = NULL; - pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), interResult, false); + pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), interResult, + false); numOutput = numBins - 1; tscExprAddParams(&pExpr->base, (char*)&numOutput, TSDB_DATA_TYPE_INT, sizeof(int32_t)); tscExprAddParams(&pExpr->base, (char*)intervals, TSDB_DATA_TYPE_BINARY, sizeof(double) * numBins); tfree(intervals); - //normalized param + // normalized param char val[8] = {0}; if (pParamElem[3].pNode->tokenId == TK_ID) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); @@ -3604,13 +3643,15 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg13); } - tSqlExprItem* pParamElem = taosArrayGet(pItem->pNode->Expr.paramList, 0);; + tSqlExprItem* pParamElem = taosArrayGet(pItem->pNode->Expr.paramList, 0); + ; if (pParamElem->pNode->tokenId != TK_ID) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); } SColumnIndex index = COLUMN_INDEX_INITIALIZER; - if (getColumnIndexByName(&pParamElem->pNode->columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) { + if (getColumnIndexByName(&pParamElem->pNode->columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != + TSDB_CODE_SUCCESS) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3); } @@ -3625,22 +3666,24 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6); } - int32_t inter = 0; + int32_t inter = 0; int16_t resType = 0; - int32_t bytes = 0; + int32_t bytes = 0; getResultDataInfo(TSDB_DATA_TYPE_INT, 4, functionId, 0, &resType, &bytes, &inter, 0, false, pUdfInfo); - SExprInfo* pExpr = tscExprAppend(pQueryInfo, functionId, &index, resType, bytes, getNewResColId(pCmd), inter, false); + SExprInfo* pExpr = + tscExprAppend(pQueryInfo, functionId, &index, resType, bytes, getNewResColId(pCmd), inter, false); memset(pExpr->base.aliasName, 0, tListLen(pExpr->base.aliasName)); getColumnName(pItem, pExpr->base.aliasName, pExpr->base.token, sizeof(pExpr->base.aliasName) - 1); SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex); - uint64_t uid = pTableMetaInfo->pTableMeta->id.uid; + uint64_t uid = pTableMetaInfo->pTableMeta->id.uid; SColumnList ids = createColumnList(1, index.tableIndex, index.columnIndex); if (finalResult) { - insertResultField(pQueryInfo, colIndex, &ids, pUdfInfo->resBytes, pUdfInfo->resType, pExpr->base.aliasName, pExpr); + insertResultField(pQueryInfo, colIndex, &ids, pUdfInfo->resBytes, pUdfInfo->resType, pExpr->base.aliasName, + pExpr); } else { for (int32_t i = 0; i < ids.num; ++i) { tscColumnListInsert(pQueryInfo->colList, index.columnIndex, uid, pSchema); @@ -3653,7 +3696,6 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col return TSDB_CODE_TSC_INVALID_OPERATION; } - // todo refactor static SColumnList createColumnList(int32_t num, int16_t tableIndex, int32_t columnIndex) { assert(num == 1 && tableIndex >= 0); @@ -3966,7 +4008,9 @@ int32_t tscTansformFuncForSTableQuery(SQueryInfo* pQueryInfo) { (functionId >= TSDB_FUNC_FIRST_DST && functionId <= TSDB_FUNC_STDDEV_DST) || (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_IRATE) || (functionId == TSDB_FUNC_SAMPLE) || - (functionId == TSDB_FUNC_ELAPSED) || (functionId == TSDB_FUNC_HISTOGRAM)) { + (functionId == TSDB_FUNC_ELAPSED) || + (functionId == TSDB_FUNC_HISTOGRAM) || + (functionId == TSDB_FUNC_UNIQUE)) { if (getResultDataInfo(pSrcSchema->type, pSrcSchema->bytes, functionId, (int32_t)pExpr->base.param[0].i64, &type, &bytes, &interBytes, 0, true, NULL) != TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_OPERATION; @@ -6614,7 +6658,7 @@ static void setDefaultOrderInfo(SQueryInfo* pQueryInfo) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); pQueryInfo->order.order = TSDB_ORDER_ASC; - if (isTopBottomQuery(pQueryInfo)) { + if (isTopBottomUniqueQuery(pQueryInfo)) { pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX; } else { // in case of select tbname from super_table, the default order column can not be the primary ts column pQueryInfo->order.orderColId = INT32_MIN; // todo define a macro @@ -6770,7 +6814,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq } } - if (!(orderByTags || orderByTS || orderByGroupbyCol) && !isTopBottomQuery(pQueryInfo)) { + if (!(orderByTags || orderByTS || orderByGroupbyCol) && !isTopBottomUniqueQuery(pQueryInfo)) { return invalidOperationMsg(pMsgBuf, msg3); } @@ -6780,7 +6824,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq if (tscIsDiffDerivLikeQuery(pQueryInfo)) { return invalidOperationMsg(pMsgBuf, msg12); } - pQueryInfo->groupbyExpr.orderIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta); + //pQueryInfo->groupbyExpr.orderIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta); CommonItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0); pQueryInfo->groupbyExpr.orderType = p1->sortOrder; @@ -6792,7 +6836,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq if (udf) { return invalidOperationMsg(pMsgBuf, msg11); } - } else if (isTopBottomQuery(pQueryInfo)) { + } else if (isTopBottomUniqueQuery(pQueryInfo)) { /* order of top/bottom query in interval is not valid */ int32_t pos = tscExprTopBottomIndex(pQueryInfo); @@ -6839,7 +6883,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq } else { pItem = taosArrayGet(pSqlNode->pSortOrder, 0); if (orderByTags) { - pQueryInfo->groupbyExpr.orderIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta); + //pQueryInfo->groupbyExpr.orderIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta); pQueryInfo->groupbyExpr.orderType = pItem->sortOrder; } else if (orderByGroupbyCol){ pQueryInfo->order.order = pItem->sortOrder; @@ -6884,7 +6928,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq return invalidOperationMsg(pMsgBuf, msg1); } - if (index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX && !isTopBottomQuery(pQueryInfo)) { + if (index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX && !isTopBottomUniqueQuery(pQueryInfo)) { bool validOrder = false; SArray *columnInfo = pQueryInfo->groupbyExpr.columnInfo; if (columnInfo != NULL && taosArrayGetSize(columnInfo) > 0) { @@ -6905,11 +6949,11 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq } CommonItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0); - pQueryInfo->groupbyExpr.orderIndex = pSchema[index.columnIndex].colId; + //pQueryInfo->groupbyExpr.orderIndex = pSchema[index.columnIndex].colId; pQueryInfo->groupbyExpr.orderType = p1->sortOrder; } - if (isTopBottomQuery(pQueryInfo)) { + if (isTopBottomUniqueQuery(pQueryInfo)) { SArray *columnInfo = pQueryInfo->groupbyExpr.columnInfo; if (columnInfo != NULL && taosArrayGetSize(columnInfo) > 0) { SColIndex* pColIndex = taosArrayGet(columnInfo, 0); @@ -7983,7 +8027,7 @@ static void doUpdateSqlFunctionForTagPrj(SQueryInfo* pQueryInfo) { pExpr->base.functionId = TSDB_FUNC_TAG_DUMMY; tagLength += pExpr->base.resBytes; } else if (pExpr->base.functionId == TSDB_FUNC_PRJ && pExpr->base.colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { - pExpr->base.functionId = TSDB_FUNC_TS_DUMMY; + pExpr->base.functionId = TSDB_FUNC_TS_DUMMY; // ts_select ts,top(col,2) tagLength += pExpr->base.resBytes; } } @@ -8340,6 +8384,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, char* const char* msg4 = "retrieve tags not compatible with group by or interval query"; const char* msg5 = "functions can not be mixed up"; const char* msg6 = "TWA/Diff/Derivative/Irate/CSum/MAvg/Elapsed only support group by tbname"; + const char* msg7 = "unique function does not supportted in state window query"; // only retrieve tags, group by is not supportted if (tscQueryTags(pQueryInfo)) { @@ -8419,9 +8464,14 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, char* } } + if (pQueryInfo->stateWindow && f == TSDB_FUNC_UNIQUE){ + return invalidOperationMsg(msg, msg7); + } + if (IS_MULTIOUTPUT(aAggs[f].status) && f != TSDB_FUNC_TOP && f != TSDB_FUNC_BOTTOM && f != TSDB_FUNC_DIFF && f != TSDB_FUNC_MAVG && f != TSDB_FUNC_CSUM && f != TSDB_FUNC_SAMPLE && - f != TSDB_FUNC_DERIVATIVE && f != TSDB_FUNC_TAGPRJ && f != TSDB_FUNC_PRJ) { + f != TSDB_FUNC_DERIVATIVE && f != TSDB_FUNC_TAGPRJ && f != TSDB_FUNC_PRJ && + f != TSDB_FUNC_UNIQUE) { return invalidOperationMsg(msg, msg1); } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index a3edca6ee229d459d4b9bee279c03750ef08132d..b59f7cc4db0c4e0cf6c5d2d04d5aeb1d5aa44154 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1045,8 +1045,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SGroupbyExpr *pGroupbyExpr = query.pGroupbyExpr; if (pGroupbyExpr != NULL && pGroupbyExpr->numOfGroupCols > 0) { - pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex); - pQueryMsg->orderType = htons(pGroupbyExpr->orderType); + //pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex); + pQueryMsg->groupOrderType = htons(pGroupbyExpr->orderType); for (int32_t j = 0; j < pGroupbyExpr->numOfGroupCols; ++j) { SColIndex* pCol = taosArrayGet(pGroupbyExpr->columnInfo, j); @@ -1947,7 +1947,6 @@ int tscProcessRetrieveGlobalMergeRsp(SSqlObj *pSql) { SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd); if (pQueryInfo->pQInfo == NULL) { STableGroupInfo tableGroupInfo = {.numOfTables = 1, .pGroupList = taosArrayInit(1, POINTER_BYTES),}; - tableGroupInfo.map = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); STableKeyInfo tableKeyInfo = {.pTable = NULL, .lastKey = INT64_MIN}; @@ -1958,8 +1957,6 @@ int tscProcessRetrieveGlobalMergeRsp(SSqlObj *pSql) { tscDebug("0x%"PRIx64" create QInfo 0x%"PRIx64" to execute query processing", pSql->self, pSql->self); pQueryInfo->pQInfo = createQInfoFromQueryNode(pQueryInfo, &tableGroupInfo, NULL, NULL, pRes->pMerger, MERGE_STAGE, pSql->self); if (pQueryInfo->pQInfo == NULL) { - taosHashCleanup(tableGroupInfo.map); - taosArrayDestroy(&group); tscAsyncResultOnError(pSql); pRes->code = TSDB_CODE_QRY_OUT_OF_MEMORY; return pRes->code; diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 39289a55f482df04979ba79f500f9f19d04dac03..be9b3a2cf32e969594986066fdaba7af847607ac 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -3805,6 +3805,7 @@ void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGr assert(pQueryInfo != NULL); SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo)); if (pQInfo == NULL) { + tsdbDestroyTableGroup(pTableGroupInfo); goto _cleanup; } @@ -3913,6 +3914,7 @@ void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGr int32_t code = initQInfo(&bufInfo, NULL, pSourceOperator, pQInfo, ¶m, NULL, 0, merger); taosArrayDestroy(&pa); if (code != TSDB_CODE_SUCCESS) { + pQInfo = NULL; goto _cleanup; } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 6d6e41aa848524fbdf9e11fdb7f7106a2380c14f..dfbe4441463a3a3e18c50955110bcc368549217d 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -74,11 +74,11 @@ int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *le break; case TSDB_DATA_TYPE_UINT: - n = sprintf(str, "%d", *(uint32_t*)buf); + n = sprintf(str, "%u", *(uint32_t*)buf); break; case TSDB_DATA_TYPE_UBIGINT: - n = sprintf(str, "%" PRId64, *(uint64_t*)buf); + n = sprintf(str, "%" PRIu64, *(uint64_t*)buf); break; case TSDB_DATA_TYPE_FLOAT: @@ -304,7 +304,7 @@ bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableI return false; } - // order by columnIndex exists, not a non-ordered projection query + // order by columnIndex not exists, not a ordered projection query return pQueryInfo->order.orderColId < 0; } @@ -313,7 +313,7 @@ bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableInde return false; } - // order by columnIndex exists, a non-ordered projection query + // order by columnIndex exists, a ordered projection query return pQueryInfo->order.orderColId >= 0; } @@ -689,7 +689,8 @@ bool isSimpleAggregateRv(SQueryInfo* pQueryInfo) { (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_TS_COMP || functionId == TSDB_FUNC_SAMPLE || - functionId == TSDB_FUNC_HISTOGRAM)) { + functionId == TSDB_FUNC_HISTOGRAM || + functionId == TSDB_FUNC_UNIQUE)) { return true; } } @@ -1404,8 +1405,6 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue } } - tableGroupInfo.map = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); - STableKeyInfo tableKeyInfo = {.pTable = NULL, .lastKey = INT64_MIN}; SArray* group = taosArrayInit(1, sizeof(STableKeyInfo)); @@ -2614,7 +2613,7 @@ SExprInfo* tscExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnInde } SExprInfo* tscExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex, - int16_t type, int16_t size) { + int16_t type, int32_t size) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); SExprInfo* pExpr = tscExprGet(pQueryInfo, index); if (pExpr == NULL) { @@ -2659,7 +2658,8 @@ int32_t tscExprTopBottomIndex(SQueryInfo* pQueryInfo){ SExprInfo* pExpr = tscExprGet(pQueryInfo, i); if (pExpr == NULL) continue; - if (pExpr->base.functionId == TSDB_FUNC_TOP || pExpr->base.functionId == TSDB_FUNC_BOTTOM) { + if (pExpr->base.functionId == TSDB_FUNC_TOP || pExpr->base.functionId == TSDB_FUNC_BOTTOM + || pExpr->base.functionId == TSDB_FUNC_UNIQUE) { return i; } } @@ -4937,7 +4937,11 @@ static int32_t createGlobalAggregateExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQu pse->colInfo.colIndex = i; pse->colType = pExpr->base.resType; - pse->colBytes = pExpr->base.resBytes; + if(pExpr->base.resBytes > INT16_MAX && pExpr->base.functionId == TSDB_FUNC_UNIQUE){ + pQueryAttr->interBytesForGlobal = pExpr->base.resBytes; + }else{ + pse->colBytes = pExpr->base.resBytes; + } } { @@ -5081,6 +5085,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt pQueryAttr->pUdfInfo = pQueryInfo->pUdfInfo; pQueryAttr->range = pQueryInfo->range; + if (pQueryInfo->order.order == TSDB_ORDER_ASC) { // TODO refactor pQueryAttr->window = pQueryInfo->window; } else { @@ -5112,6 +5117,8 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt } } + pQueryAttr->uniqueQuery = isUniqueQuery(numOfOutput, pQueryAttr->pExpr1); + pQueryAttr->tableCols = calloc(numOfCols, sizeof(SColumnInfo)); for(int32_t i = 0; i < numOfCols; ++i) { SColumn* pCol = taosArrayGetP(pQueryInfo->colList, i); @@ -5403,7 +5410,7 @@ int parseJsontoTagData(char* json, SKVRowBuilder* kvRowBuilder, char* errMsg, in // set json real data cJSON *root = cJSON_Parse(json); if (root == NULL){ - tscError("json parse error"); + tscError("json parse error:%s", json); return tscSQLSyntaxErrMsg(errMsg, "json parse error", NULL); } diff --git a/src/common/inc/tname.h b/src/common/inc/tname.h index 7a401d8a7f71c094654d06a2ed37ae3fd7fc9c94..e6b7dd1463a754bfaa78bb1081e3b6b0b753d752 100644 --- a/src/common/inc/tname.h +++ b/src/common/inc/tname.h @@ -54,7 +54,7 @@ typedef struct SSqlExpr { int32_t resBytes; // length of return value int32_t interBytes; // inter result buffer size - int16_t colType; // table column type + int16_t colType; // table column type, this should be int32_t, because it is too small for globale merge stage, pQueryAttr->interBytesForGlobal int16_t colBytes; // table column bytes int16_t numOfParams; // argument value of each function diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 44192403972cd9dc54b3f2a965e1468595e17487..64065d0b4672a36c0510242cf9d52830aeccc67b 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -293,6 +293,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_QRY_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x070D) //"System error") #define TSDB_CODE_QRY_INVALID_TIME_CONDITION TAOS_DEF_ERROR_CODE(0, 0x070E) //"invalid time condition") #define TSDB_CODE_QRY_INVALID_SCHEMA_VERSION TAOS_DEF_ERROR_CODE(0, 0x0710) //"invalid schema version") +#define TSDB_CODE_QRY_UNIQUE_RESULT_TOO_LARGE TAOS_DEF_ERROR_CODE(0, 0x0711) //"unique result num is too large") // grant #define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800) //"License expired" diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 2c4d21037c9697e832bccf082595408c712d0670..e5c390f9191f1aad622a9b8787d4643791c2a870 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -503,8 +503,8 @@ typedef struct { uint32_t tagCondLen; // tag length in current query int32_t colCondLen; // column length in current query int16_t numOfGroupCols; // num of group by columns - int16_t orderByIdx; - int16_t orderType; // used in group by xx order by xxx + int16_t orderByIdx; // useless + int16_t groupOrderType; // used for group order int64_t vgroupLimit; // limit the number of rows for each table, used in order by + limit in stable projection query. int16_t prjOrder; // global order in super table projection query. int64_t limit; diff --git a/src/query/inc/qAggMain.h b/src/query/inc/qAggMain.h index 444612f15771212757f20234c51d4b6c29a44180..aa5e2abd803d611be005115ba387a45e1138ed56 100644 --- a/src/query/inc/qAggMain.h +++ b/src/query/inc/qAggMain.h @@ -78,8 +78,9 @@ extern "C" { #define TSDB_FUNC_ELAPSED 37 #define TSDB_FUNC_HISTOGRAM 38 +#define TSDB_FUNC_UNIQUE 39 -#define TSDB_FUNC_MAX_NUM 39 +#define TSDB_FUNC_MAX_NUM 40 #define TSDB_FUNCSTATE_SO 0x1u // single output #define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM @@ -174,7 +175,7 @@ typedef struct SQLFunctionCtx { void * pInput; // input data buffer uint32_t order; // asc|desc int16_t inputType; - int16_t inputBytes; + int32_t inputBytes; int16_t outputType; int32_t outputBytes; // size of results, determined by function and input column data type @@ -200,6 +201,8 @@ typedef struct SQLFunctionCtx { SExtTagsInfo tagInfo; SPoint1 start; SPoint1 end; + + SHashObj **pUniqueSet; // for unique function } SQLFunctionCtx; typedef struct SAggFunctionInfo { @@ -249,7 +252,7 @@ void blockDistInfoToBinary(STableBlockDist* pDist, struct SBufferWriter* bw); void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDist* pDist); /* global sql function array */ -extern struct SAggFunctionInfo aAggs[40]; +extern struct SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM]; extern int32_t functionCompatList[]; // compatible check array list diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index c654047932f5c99a5e30d46e44efc9d7631e2136..c4aebc07b15749da343b4d0175812ca6e4211021 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -90,6 +90,7 @@ typedef struct SResultRow { SResultRowCellInfo* pCellInfo; // For each result column, there is a resultInfo STimeWindow win; char *key; // start key of current result row + SHashObj *uniqueHash; // for unique function } SResultRow; typedef struct SResultRowCell { @@ -221,6 +222,7 @@ typedef struct SQueryAttr { bool stableQuery; // super table query or not bool topBotQuery; // TODO used bitwise flag + bool uniqueQuery; bool groupbyColumn; // denote if this is a groupby normal column query bool hasTagResults; // if there are tag values in final result or not bool timeWindowInterpo;// if the time window start/end required interpolation @@ -281,6 +283,7 @@ typedef struct SQueryAttr { STableGroupInfo tableGroupInfo; // table list SArray int32_t vgId; SArray *pUdfInfo; // no need to free + int32_t interBytesForGlobal; } SQueryAttr; typedef SSDataBlock* (*__operator_fn_t)(void* param, bool* newgroup); @@ -730,4 +733,5 @@ void addTableReadRows(SQueryRuntimeEnv* pEnv, int32_t tid, int32_t rows); // tsdb scan table callback table or query is over. param is SQueryRuntimeEnv* bool qReadOverCB(void* param, int8_t type, int32_t tid); +bool isUniqueQuery(int32_t numOfOutput, SExprInfo* pExprs); #endif // TDENGINE_QEXECUTOR_H diff --git a/src/query/inc/qExtbuffer.h b/src/query/inc/qExtbuffer.h index ccdfd5c05994b71bd911c3a66d02dc1ffa58a474..abcf11bfa54d1950edc7e42e8b76b0121fcc4c2c 100644 --- a/src/query/inc/qExtbuffer.h +++ b/src/query/inc/qExtbuffer.h @@ -53,14 +53,14 @@ typedef struct tFlushoutInfo { } tFlushoutInfo; typedef struct tFlushoutData { - uint32_t nAllocSize; - uint32_t nLength; - tFlushoutInfo *pFlushoutInfo; + uint32_t nAllocSize; // capacity + uint32_t nLength; // size + tFlushoutInfo *pFlushoutInfo; // dynamic allocate } tFlushoutData; typedef struct SExtFileInfo { - uint32_t nFileSize; // in pages - uint32_t pageSize; + uint32_t nFileSize; // how many pages in file + //uint32_t pageSize; // useless uint32_t numOfElemsInFile; tFlushoutData flushoutData; } SExtFileInfo; diff --git a/src/query/inc/qResultbuf.h b/src/query/inc/qResultbuf.h index f0c4aa3702cc083f7cc2ceaf1afabde21a3de73b..d4194168e565fd8e1202985d3597ace56326e92e 100644 --- a/src/query/inc/qResultbuf.h +++ b/src/query/inc/qResultbuf.h @@ -78,7 +78,8 @@ typedef struct SDiskbasedResultBuf { #define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L) // in bytes #define PAGE_INFO_INITIALIZER (SPageDiskInfo){-1, -1} - +#define MAX_UNIQUE_RESULT_ROWS (1000) +#define MAX_UNIQUE_RESULT_SIZE (1024*1024*1) /** * create disk-based result buffer * @param pResultBuf diff --git a/src/query/inc/qTableMeta.h b/src/query/inc/qTableMeta.h index d47189691ebbe2c4ec3ad55dd72306686586a56e..e9b98cfe44f7bea24d4e680472d253ce4c9ce626 100644 --- a/src/query/inc/qTableMeta.h +++ b/src/query/inc/qTableMeta.h @@ -50,7 +50,7 @@ typedef struct SGroupbyExpr { int16_t tableIndex; SArray* columnInfo; // SArray, group by columns information int16_t numOfGroupCols; // todo remove it - int16_t orderIndex; // order by column index + //int16_t orderIndex; // order by column index, rm useless orderIndex int16_t orderType; // order by type: asc/desc } SGroupbyExpr; diff --git a/src/query/inc/qUtil.h b/src/query/inc/qUtil.h index 0882df77c2a8bc38560269ce093568fd96467dae..ce0a0648f554e007e46d441d9607d1d8edb971e3 100644 --- a/src/query/inc/qUtil.h +++ b/src/query/inc/qUtil.h @@ -70,8 +70,12 @@ static FORCE_INLINE char* getPosInResultPage(SQueryAttr* pQueryAttr, tFilePage* int32_t offset) { assert(rowOffset >= 0 && pQueryAttr != NULL); - int32_t numOfRows = (int32_t)getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery); - return ((char *)page->data) + rowOffset + offset * numOfRows; + int64_t numOfRows = (int64_t)getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery); + numOfRows *= offset; + if(numOfRows >= INT32_MAX){ + assert(0); + } + return ((char *)page->data) + rowOffset + numOfRows; } bool isNullOperator(SColumnFilterElem *pFilter, const char* minval, const char* maxval, int16_t type); diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index 2069fe7578cb68aff8ff98e20eaa50d323d564ff..b294c0482f0d2002cca7255f572d527ec21b543b 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -223,6 +223,16 @@ typedef struct{ SHistogramFuncBin* orderedBins; } SHistogramFuncInfo; +typedef struct { + int64_t timestamp; + char data[]; +} UniqueUnit; + +typedef struct { + int32_t num; + char res[]; +} SUniqueFuncInfo; + int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type, int32_t *bytes, int32_t *interBytes, int16_t extLength, bool isSuperTable, SUdfInfo* pUdfInfo) { if (!isValidDataType(dataType)) { @@ -353,6 +363,18 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI *bytes = (sizeof(STopBotInfo) + (sizeof(tValuePair) + POINTER_BYTES + extLength) * param); *interBytes = *bytes; + return TSDB_CODE_SUCCESS; + } else if (functionId == TSDB_FUNC_UNIQUE) { + *type = TSDB_DATA_TYPE_BINARY; + int64_t size = sizeof(UniqueUnit) + dataBytes + extLength; + size *= param; + size += sizeof(SUniqueFuncInfo); + if (size > MAX_UNIQUE_RESULT_SIZE){ + size = MAX_UNIQUE_RESULT_SIZE; + } + *bytes = size; + *interBytes = *bytes; + return TSDB_CODE_SUCCESS; } else if (functionId == TSDB_FUNC_SAMPLE) { *type = TSDB_DATA_TYPE_BINARY; @@ -477,10 +499,20 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI *type = (int16_t)dataType; *bytes = dataBytes; - size_t size = sizeof(STopBotInfo) + (sizeof(tValuePair) + POINTER_BYTES + extLength) * param; + size_t size = sizeof(STopBotInfo) + (sizeof(tValuePair) + extLength) * param; // the output column may be larger than sizeof(STopBotInfo) *interBytes = (int32_t)size; + } else if (functionId == TSDB_FUNC_UNIQUE) { + *type = (int16_t)dataType; + *bytes = dataBytes; + int64_t size = sizeof(UniqueUnit) + dataBytes + extLength; + size *= param; + size += sizeof(SUniqueFuncInfo); + if (size > MAX_UNIQUE_RESULT_SIZE){ + size = MAX_UNIQUE_RESULT_SIZE; + } + *interBytes = (int32_t)size; } else if (functionId == TSDB_FUNC_SAMPLE) { *type = (int16_t)dataType; *bytes = dataBytes; @@ -2130,7 +2162,7 @@ static void copyTopBotRes(SQLFunctionCtx *pCtx, int32_t type) { int32_t step = QUERY_ASC_FORWARD_STEP; int32_t len = (int32_t)(GET_RES_INFO(pCtx)->numOfRes); - + switch (type) { case TSDB_DATA_TYPE_UINT: case TSDB_DATA_TYPE_INT: { @@ -5097,6 +5129,194 @@ static void histogram_func_finalizer(SQLFunctionCtx *pCtx) { doFinalizer(pCtx); } +// unique use the intermediate result buffer to keep the intermediate result +static SUniqueFuncInfo *getUniqueOutputInfo(SQLFunctionCtx *pCtx) { + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + + // only the first_stage_merge is directly written data into final output buffer + if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) { + return (SUniqueFuncInfo*) pCtx->pOutput; + } else { // during normal table query and super table at the secondary_stage, result is written to intermediate buffer + return GET_ROWCELL_INTERBUF(pResInfo); + } +} + +// unique +static void copyUniqueRes(SQLFunctionCtx *pCtx, int32_t bytes) { + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + SUniqueFuncInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo); + + size_t size = sizeof(UniqueUnit) + bytes + pCtx->tagInfo.tagsLen; + int32_t len = (int32_t)(GET_RES_INFO(pCtx)->numOfRes); + + char *tsOutput = pCtx->ptsOutputBuf; + char *output = pCtx->pOutput; + int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->param[2].i64); + char *tvp = pRes->res + (size * ((pCtx->param[2].i64 == TSDB_ORDER_ASC) ? 0 : len -1)); + for (int32_t i = 0; i < len; ++i) { + memcpy(tsOutput, tvp, sizeof(int64_t)); + memcpy(output, tvp + sizeof(UniqueUnit), bytes); + tvp += (step * size); + tsOutput += sizeof(int64_t); + output += bytes; + } + + // set the corresponding tag data for each record + // todo check malloc failure + if (pCtx->tagInfo.numOfTagCols == 0) { + return ; + } + + char **pData = calloc(pCtx->tagInfo.numOfTagCols, POINTER_BYTES); + for (int32_t i = 0; i < pCtx->tagInfo.numOfTagCols; ++i) { + pData[i] = pCtx->tagInfo.pTagCtxList[i]->pOutput; + } + + tvp = pRes->res + (size * ((pCtx->param[2].i64 == TSDB_ORDER_ASC) ? 0 : len -1)); + for (int32_t i = 0; i < len; ++i) { + int16_t offset = sizeof(UniqueUnit) + bytes; + for (int32_t j = 0; j < pCtx->tagInfo.numOfTagCols; ++j) { + memcpy(pData[j], tvp + offset, (size_t)pCtx->tagInfo.pTagCtxList[j]->outputBytes); + offset += pCtx->tagInfo.pTagCtxList[j]->outputBytes; + pData[j] += pCtx->tagInfo.pTagCtxList[j]->outputBytes; + } + tvp += (step * size); + } + + tfree(pData); +} + +static bool unique_function_setup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) { + if (!function_setup(pCtx, pResInfo)) { + return false; + } + if(*pCtx->pUniqueSet != NULL){ + taosHashClear(*pCtx->pUniqueSet); + }else{ + *pCtx->pUniqueSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + } + + return true; +} + +static void do_unique_function(SQLFunctionCtx *pCtx, SUniqueFuncInfo *pInfo, TSKEY timestamp, char *pData, char *tag, int32_t bytes, int16_t type){ + int32_t hashKeyBytes = bytes; + if(IS_VAR_DATA_TYPE(type)){ // for var data, we can not use bytes, because there are dirty data in the back of var data + hashKeyBytes = varDataTLen(pData); + } + UniqueUnit **unique = taosHashGet(*pCtx->pUniqueSet, pData, hashKeyBytes); + if (unique == NULL) { + size_t size = sizeof(UniqueUnit) + bytes + pCtx->tagInfo.tagsLen; + char *tmp = pInfo->res + pInfo->num * size; + ((UniqueUnit*)tmp)->timestamp = timestamp; + char *data = tmp + sizeof(UniqueUnit); + char *tags = tmp + sizeof(UniqueUnit) + bytes; + memcpy(data, pData, bytes); + + if (pCtx->currentStage == MERGE_STAGE && tag != NULL) { + memcpy(tags, tag, (size_t)pCtx->tagInfo.tagsLen); + }else{ + int32_t offset = 0; + for (int32_t j = 0; j < pCtx->tagInfo.numOfTagCols; ++j) { + SQLFunctionCtx *tagCtx = pCtx->tagInfo.pTagCtxList[j]; + if (tagCtx->functionId == TSDB_FUNC_TS_DUMMY) { + tagCtx->tag.nType = TSDB_DATA_TYPE_BIGINT; + tagCtx->tag.i64 = timestamp; + } + + tVariantDump(&tagCtx->tag, tagCtx->pOutput, tagCtx->tag.nType, true); + memcpy(tags + offset, tagCtx->pOutput, tagCtx->outputBytes); + offset += tagCtx->outputBytes; + } + } + + taosHashPut(*pCtx->pUniqueSet, pData, hashKeyBytes, &tmp, sizeof(UniqueUnit*)); + pInfo->num++; + }else if((*unique)->timestamp > timestamp){ + (*unique)->timestamp = timestamp; + } +} + +static void unique_function(SQLFunctionCtx *pCtx) { + SUniqueFuncInfo *pInfo = getUniqueOutputInfo(pCtx); + + for (int32_t i = 0; i < pCtx->size; i++) { + char *pData = GET_INPUT_DATA(pCtx, i); + TSKEY k = 0; + if (pCtx->ptsList != NULL) { + k = GET_TS_DATA(pCtx, i); + } + do_unique_function(pCtx, pInfo, k, pData, NULL, pCtx->inputBytes, pCtx->inputType); + + if (sizeof(SUniqueFuncInfo) + pInfo->num * (sizeof(UniqueUnit) + pCtx->inputBytes + pCtx->tagInfo.tagsLen) >= MAX_UNIQUE_RESULT_SIZE){ + GET_RES_INFO(pCtx)->numOfRes = -1; // mark out of memory + return; + } + } + + GET_RES_INFO(pCtx)->numOfRes = 1; +} + +static void unique_function_merge(SQLFunctionCtx *pCtx) { + SUniqueFuncInfo *pInput = (SUniqueFuncInfo *)GET_INPUT_DATA_LIST(pCtx); + SUniqueFuncInfo *pOutput = getUniqueOutputInfo(pCtx); + size_t size = sizeof(UniqueUnit) + pCtx->outputBytes + pCtx->tagInfo.tagsLen; + for (int32_t i = 0; i < pInput->num; ++i) { + char *tmp = pInput->res + i* size; + TSKEY timestamp = ((UniqueUnit*)tmp)->timestamp; + char *data = tmp + sizeof(UniqueUnit); + char *tags = tmp + sizeof(UniqueUnit) + pCtx->outputBytes; + do_unique_function(pCtx, pOutput, timestamp, data, tags, pCtx->outputBytes, pCtx->outputType); + + if (sizeof(SUniqueFuncInfo) + pOutput->num * (sizeof(UniqueUnit) + pCtx->outputBytes + pCtx->tagInfo.tagsLen) >= MAX_UNIQUE_RESULT_SIZE){ + GET_RES_INFO(pCtx)->numOfRes = -1; // mark out of memory + return; + } + } + + GET_RES_INFO(pCtx)->numOfRes = pOutput->num; +} + +typedef struct{ + int32_t dataOffset; + __compar_fn_t comparFn; +} UiqueSupporter; + +static int32_t uniqueCompareFn(const void *p1, const void *p2, const void *param) { + UiqueSupporter *support = (UiqueSupporter *)param; + return support->comparFn(p1 + support->dataOffset, p2 + support->dataOffset); +} + +static void unique_func_finalizer(SQLFunctionCtx *pCtx) { + SUniqueFuncInfo *pInfo = getUniqueOutputInfo(pCtx); + + GET_RES_INFO(pCtx)->numOfRes = pInfo->num; + int32_t bytes = 0; + int32_t type = 0; + if (pCtx->currentStage == MERGE_STAGE) { + bytes = pCtx->outputBytes; + type = pCtx->outputType; + assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY); + } else { + bytes = pCtx->inputBytes; + type = pCtx->inputType; + } + UiqueSupporter support = {0}; + // user specify the order of output by sort the result according to timestamp + if (pCtx->param[1].i64 == PRIMARYKEY_TIMESTAMP_COL_INDEX) { + support.dataOffset = 0; + support.comparFn = compareInt64Val; + } else{ + support.dataOffset = sizeof(UniqueUnit); + support.comparFn = getComparFunc(type, 0); + } + + size_t size = sizeof(UniqueUnit) + bytes + pCtx->tagInfo.tagsLen; + taosqsort(pInfo->res, (size_t)GET_RES_INFO(pCtx)->numOfRes, size, &support, uniqueCompareFn); + copyUniqueRes(pCtx, bytes); + doFinalizer(pCtx); +} + ///////////////////////////////////////////////////////////////////////////////////////////// /* * function compatible list. @@ -5117,11 +5337,11 @@ int32_t functionCompatList[] = { 1, 1, 1, 1, -1, 1, 1, 1, 5, 1, 1, // tid_tag, deriv, csum, mavg, sample, 6, 8, -1, -1, -1, - // block_info,elapsed,histogram - 7, 1, -1 + // block_info,elapsed,histogram,unique + 7, 1, -1, -1 }; -SAggFunctionInfo aAggs[40] = {{ +SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{ // 0, count function does not invoke the finalize function "count", TSDB_FUNC_COUNT, @@ -5591,5 +5811,17 @@ SAggFunctionInfo aAggs[40] = {{ histogram_func_finalizer, histogram_func_merge, dataBlockRequired, + }, + { + // 39 + "unique", + TSDB_FUNC_UNIQUE, + TSDB_FUNC_UNIQUE, + TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_SELECTIVITY, + unique_function_setup, + unique_function, + unique_func_finalizer, + unique_function_merge, + dataBlockRequired, } }; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 85146cb0223fdc3fcb4e1ee39ed74bb424ed74b3..0e1cf3a8830d12ad33d994bf421d70c7eaeec274 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -281,7 +281,7 @@ static int compareRowData(const void *a, const void *b, const void *userData) { tFilePage *page1 = getResBufPage(pRuntimeEnv->pResultBuf, pRow1->pageId); tFilePage *page2 = getResBufPage(pRuntimeEnv->pResultBuf, pRow2->pageId); - int16_t offset = supporter->dataOffset; + int32_t offset = supporter->dataOffset; char *in1 = getPosInResultPage(pRuntimeEnv->pQueryAttr, page1, pRow1->offset, offset); char *in2 = getPosInResultPage(pRuntimeEnv->pQueryAttr, page2, pRow2->offset, offset); @@ -289,9 +289,8 @@ static int compareRowData(const void *a, const void *b, const void *userData) { } static void sortGroupResByOrderList(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv *pRuntimeEnv, SSDataBlock* pDataBlock, SQLFunctionCtx *pCtx) { - SArray *columnOrderList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr); - size_t size = taosArrayGetSize(columnOrderList); - taosArrayDestroy(&columnOrderList); + int32_t size = pRuntimeEnv->pQueryAttr->pGroupbyExpr == NULL? 0: pRuntimeEnv->pQueryAttr->pGroupbyExpr->numOfGroupCols; + if (pRuntimeEnv->pQueryAttr->interval.interval > 0) size++; if (size <= 0) { return; @@ -357,7 +356,13 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO idata.info.bytes = pExpr[i].base.resBytes; idata.info.colId = pExpr[i].base.resColId; - int32_t size = MAX(idata.info.bytes * numOfRows, minSize); + int64_t tmp = idata.info.bytes; + tmp *= numOfRows; + if (tmp >= 1024*1024*1024) { // 1G + qError("size is too large, failed to allocate column buffer for output buffer"); + tmp = 128*1024*1024; + } + int32_t size = MAX(tmp, minSize); idata.pData = calloc(1, size); // at least to hold a pointer on x64 platform if (idata.pData == NULL) { qError("failed to allocate column buffer for output buffer"); @@ -1004,6 +1009,13 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx } } + if (functionId == TSDB_FUNC_UNIQUE && + (GET_RES_INFO(&(pCtx[k]))->numOfRes > MAX_UNIQUE_RESULT_ROWS || GET_RES_INFO(&(pCtx[k]))->numOfRes == -1)){ + qError("Unique result num is too large. num: %d, limit: %d", + GET_RES_INFO(&(pCtx[k]))->numOfRes, MAX_UNIQUE_RESULT_ROWS); + longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_UNIQUE_RESULT_TOO_LARGE); + } + // restore it pCtx[k].preAggVals.isSet = hasAggregates; pCtx[k].pInput = start; @@ -1263,6 +1275,13 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunction } else { assert(0); } + + if (functionId == TSDB_FUNC_UNIQUE && + (GET_RES_INFO(&(pCtx[k]))->numOfRes > MAX_UNIQUE_RESULT_ROWS || GET_RES_INFO(&(pCtx[k]))->numOfRes == -1)){ + qError("Unique result num is too large. num: %d, limit: %d", + GET_RES_INFO(&(pCtx[k]))->numOfRes, MAX_UNIQUE_RESULT_ROWS); + longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_UNIQUE_RESULT_TOO_LARGE); + } } } } @@ -1893,7 +1912,7 @@ static int32_t setCtxTagColumnInfo(SQLFunctionCtx *pCtx, int32_t numOfOutput) { continue; } - if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { + if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { //ts_select ts,top(col,2) tagLen += pCtx[i].outputBytes; pTagCtx[num++] = &pCtx[i]; } else if ((aAggs[functionId].status & TSDB_FUNCSTATE_SELECTIVITY) != 0) { @@ -1945,8 +1964,12 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr pCtx->requireNull = false; } - pCtx->inputBytes = pSqlExpr->colBytes; pCtx->inputType = pSqlExpr->colType; + if (pRuntimeEnv->pQueryAttr->interBytesForGlobal > INT16_MAX && pSqlExpr->functionId == TSDB_FUNC_UNIQUE){ + pCtx->inputBytes = pRuntimeEnv->pQueryAttr->interBytesForGlobal; + }else{ + pCtx->inputBytes = pSqlExpr->colBytes; + } pCtx->ptsOutputBuf = NULL; @@ -1980,7 +2003,8 @@ static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExpr // set the order information for top/bottom query int32_t functionId = pCtx->functionId; - if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) { + if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM + || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_UNIQUE) { int32_t f = pExpr[i-1].base.functionId; assert(f == TSDB_FUNC_TS || f == TSDB_FUNC_TS_DUMMY); @@ -3165,7 +3189,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa if ((*status) != BLK_DATA_ALL_NEEDED) { // the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet, // the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer - if (QUERY_IS_INTERVAL_QUERY(pQueryAttr) && (!pQueryAttr->pointInterpQuery)) { + if (QUERY_IS_INTERVAL_QUERY(pQueryAttr) && (!pQueryAttr->pointInterpQuery) && (!pQueryAttr->uniqueQuery)) { SResultRow* pResult = NULL; bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); @@ -3177,7 +3201,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) { longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } - } else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery) && (!pQueryAttr->diffQuery) && (!pQueryAttr->pointInterpQuery)) { // stable aggregate, not interval aggregate or normal column aggregate + } else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery) && (!pQueryAttr->diffQuery) && (!pQueryAttr->pointInterpQuery) && (!pQueryAttr->uniqueQuery)) { // stable aggregate, not interval aggregate or normal column aggregate doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx, pTableScanInfo->rowCellInfoOffset, pTableScanInfo->numOfOutput, pRuntimeEnv->current->groupIndex); @@ -3664,6 +3688,9 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i RESET_RESULT_INFO(pCellInfo); pCtx[i].resultInfo = pCellInfo; + if (pCtx[i].functionId == TSDB_FUNC_UNIQUE) { + pCtx[i].pUniqueSet = &pRow->uniqueHash; + } pCtx[i].pOutput = pData->pData; pCtx[i].currentStage = stage; assert(pCtx[i].pOutput != NULL); @@ -3671,7 +3698,7 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i // set the timestamp output buffer for top/bottom/diff query int32_t fid = pCtx[i].functionId; if (fid == TSDB_FUNC_TOP || fid == TSDB_FUNC_BOTTOM || fid == TSDB_FUNC_DIFF || fid == TSDB_FUNC_DERIVATIVE || - fid == TSDB_FUNC_SAMPLE || fid == TSDB_FUNC_MAVG || fid == TSDB_FUNC_CSUM) { + fid == TSDB_FUNC_SAMPLE || fid == TSDB_FUNC_MAVG || fid == TSDB_FUNC_CSUM || fid == TSDB_FUNC_UNIQUE) { if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i-1].pOutput; } else if (fid == TSDB_FUNC_INTERP) { assert(pCtx[0].functionId == TSDB_FUNC_TS_DUMMY || pCtx[0].functionId == TSDB_FUNC_TS); @@ -3742,7 +3769,7 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE || functionId == TSDB_FUNC_CSUM || functionId == TSDB_FUNC_MAVG || - functionId == TSDB_FUNC_SAMPLE ) { + functionId == TSDB_FUNC_SAMPLE || functionId == TSDB_FUNC_UNIQUE) { if (i > 0) pBInfo->pCtx[i].ptsOutputBuf = pBInfo->pCtx[i-1].pOutput; } else if (functionId == TSDB_FUNC_INTERP) { assert(pBInfo->pCtx[0].functionId == TSDB_FUNC_TS_DUMMY || pBInfo->pCtx[0].functionId == TSDB_FUNC_TS); @@ -3918,6 +3945,15 @@ void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResult } } +bool isUniqueQuery(int32_t numOfOutput, SExprInfo* pExprs) { + for (int32_t i = 0; i < numOfOutput; ++i) { + if (pExprs[i].base.functionId == TSDB_FUNC_UNIQUE) { + return true; + } + } + return false; +} + static bool hasMainOutput(SQueryAttr *pQueryAttr) { for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) { int32_t functionId = pQueryAttr->pExpr1[i].base.functionId; @@ -3989,6 +4025,9 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe int32_t offset = 0; for (int32_t i = 0; i < numOfOutput; ++i) { pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset); + if (pCtx[i].functionId == TSDB_FUNC_UNIQUE){ + pCtx[i].pUniqueSet = &pResult->uniqueHash; + } SResultRowCellInfo* pResInfo = pCtx[i].resultInfo; if (pResInfo->initialized && pResInfo->complete) { @@ -4002,7 +4041,8 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe int32_t functionId = pCtx[i].functionId; if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF || - functionId == TSDB_FUNC_CSUM || functionId == TSDB_FUNC_MAVG || functionId == TSDB_FUNC_SAMPLE) { + functionId == TSDB_FUNC_CSUM || functionId == TSDB_FUNC_MAVG || functionId == TSDB_FUNC_SAMPLE || + functionId == TSDB_FUNC_UNIQUE) { if(i > 0) pCtx[i].ptsOutputBuf = pCtx[i-1].pOutput; } @@ -4063,7 +4103,7 @@ void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLF // Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pageId); - int16_t offset = 0; + int32_t offset = 0; for (int32_t i = 0; i < numOfCols; ++i) { pCtx[i].pOutput = getPosInResultPage(pRuntimeEnv->pQueryAttr, page, pResult->offset, offset); offset += pCtx[i].outputBytes; @@ -4071,7 +4111,8 @@ void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLF int32_t functionId = pCtx[i].functionId; if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE || - functionId == TSDB_FUNC_SAMPLE || functionId == TSDB_FUNC_MAVG || functionId == TSDB_FUNC_CSUM) { + functionId == TSDB_FUNC_SAMPLE || functionId == TSDB_FUNC_MAVG || + functionId == TSDB_FUNC_CSUM || functionId == TSDB_FUNC_UNIQUE) { if(i > 0) pCtx[i].ptsOutputBuf = pCtx[i-1].pOutput; } @@ -4080,6 +4121,9 @@ void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLF * not all queries require the interResultBuf, such as COUNT */ pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset); + if (pCtx[i].functionId == TSDB_FUNC_UNIQUE) { + pCtx[i].pUniqueSet = &pResult->uniqueHash; + } } } @@ -5511,10 +5555,6 @@ SArray* getOrderCheckColumns(SQueryAttr* pQuery) { } if (pQuery->interval.interval > 0) { - if (pOrderColumns == NULL) { - pOrderColumns = taosArrayInit(1, sizeof(SColIndex)); - } - SColIndex colIndex = {.colIndex = 0, .colId = 0, .flag = TSDB_COL_NORMAL}; taosArrayPush(pOrderColumns, &colIndex); } @@ -8753,8 +8793,8 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { pMsg += tListLen(param->pGroupColIndex[i].name); } - pQueryMsg->orderByIdx = htons(pQueryMsg->orderByIdx); - pQueryMsg->orderType = htons(pQueryMsg->orderType); + //pQueryMsg->orderByIdx = htons(pQueryMsg->orderByIdx); + pQueryMsg->groupOrderType = htons(pQueryMsg->groupOrderType); } pQueryMsg->fillType = htons(pQueryMsg->fillType); @@ -8962,7 +9002,7 @@ static int32_t updateOutputBufForTopBotQuery(SQueriedTableInfo* pTableInfo, SCol for (int32_t i = 0; i < numOfOutput; ++i) { int16_t functId = pExprs[i].base.functionId; - if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM || functId == TSDB_FUNC_SAMPLE) { + if (functId == TSDB_FUNC_TOP || functId == TSDB_FUNC_BOTTOM || functId == TSDB_FUNC_SAMPLE || functId == TSDB_FUNC_UNIQUE) { int32_t j = getColumnIndexInSource(pTableInfo, &pExprs[i].base, pTagCols); if (j < 0 || j >= pTableInfo->numOfCols) { return TSDB_CODE_QRY_INVALID_MSG; @@ -9331,8 +9371,8 @@ SGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pCo } pGroupbyExpr->numOfGroupCols = pQueryMsg->numOfGroupCols; - pGroupbyExpr->orderType = pQueryMsg->orderType; - pGroupbyExpr->orderIndex = pQueryMsg->orderByIdx; + pGroupbyExpr->orderType = pQueryMsg->groupOrderType; + //pGroupbyExpr->orderIndex = pQueryMsg->orderByIdx; pGroupbyExpr->columnInfo = taosArrayInit(pQueryMsg->numOfGroupCols, sizeof(SColIndex)); for(int32_t i = 0; i < pQueryMsg->numOfGroupCols; ++i) { @@ -9547,7 +9587,8 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S pQueryAttr->vgId = vgId; pQueryAttr->pFilters = pFilters; pQueryAttr->range = pQueryMsg->range; - + pQueryAttr->uniqueQuery = isUniqueQuery(numOfOutput, pExprs); + pQueryAttr->tableCols = calloc(numOfCols, sizeof(SSingleColumnFilterInfo)); if (pQueryAttr->tableCols == NULL) { goto _cleanup; diff --git a/src/query/src/qExtbuffer.c b/src/query/src/qExtbuffer.c index 9d174b0389d74073b5989af5a8fd7c26d5fd80dd..5b210f882415f3995cf4fcde1ba3087397bb75b6 100644 --- a/src/query/src/qExtbuffer.c +++ b/src/query/src/qExtbuffer.c @@ -46,7 +46,7 @@ tExtMemBuffer* createExtMemBuffer(int32_t inMemSize, int32_t elemSize, int32_t p SExtFileInfo *pFMeta = &pMemBuffer->fileMeta; - pFMeta->pageSize = DEFAULT_PAGE_SIZE; + //pFMeta->pageSize = DEFAULT_PAGE_SIZE; pFMeta->flushoutData.nAllocSize = 4; pFMeta->flushoutData.nLength = 0; diff --git a/src/query/src/qPlan.c b/src/query/src/qPlan.c index e737db6edabf06dfb6c458364755d192bc8b8694..95c7f81ed68d0ef8f303ee45deda89e347d163d9 100644 --- a/src/query/src/qPlan.c +++ b/src/query/src/qPlan.c @@ -77,7 +77,7 @@ static SQueryNode* createQueryNode(int32_t type, const char* name, SQueryNode** pGroupbyExpr->tableIndex = p->tableIndex; pGroupbyExpr->orderType = p->orderType; - pGroupbyExpr->orderIndex = p->orderIndex; + //pGroupbyExpr->orderIndex = p->orderIndex; pGroupbyExpr->numOfGroupCols = p->numOfGroupCols; pGroupbyExpr->columnInfo = taosArrayDup(p->columnInfo); pNode->pExtInfo = pGroupbyExpr; diff --git a/src/query/src/qResultbuf.c b/src/query/src/qResultbuf.c index 05ac5f7dc579aa788538ead5b7be2ff72926ea12..8610aaeeb20279b1a898bee78403ed8c8cd18fee 100644 --- a/src/query/src/qResultbuf.c +++ b/src/query/src/qResultbuf.c @@ -20,7 +20,7 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t pa pResBuf->pageSize = pagesize; pResBuf->numOfPages = 0; // all pages are in buffer in the first place pResBuf->totalBufSize = 0; - pResBuf->inMemPages = inMemBufSize/pagesize; // maximum allowed pages, it is a soft limit. + pResBuf->inMemPages = inMemBufSize/pagesize + 1; // maximum allowed pages, it is a soft limit. pResBuf->allocateId = -1; pResBuf->comp = true; pResBuf->file = NULL; @@ -28,7 +28,7 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t pa pResBuf->fileSize = 0; // at least more than 2 pages must be in memory - assert(inMemBufSize >= pagesize * 2); + // assert(inMemBufSize >= pagesize * 2); pResBuf->lruList = tdListNew(POINTER_BYTES); @@ -257,7 +257,7 @@ static char* evicOneDataPage(SDiskbasedResultBuf* pResultBuf) { int32_t prev = pResultBuf->inMemPages; // increase by 50% of previous mem pages - pResultBuf->inMemPages = (int32_t)(pResultBuf->inMemPages * 1.5f); + pResultBuf->inMemPages = (int32_t)(pResultBuf->inMemPages * 1.5f) + 1; // if pResultBuf->inMemPages == 1, *1.5 always == 1 qWarn("%p in memory buf page not sufficient, expand from %d to %d, page size:%d", pResultBuf, prev, pResultBuf->inMemPages, pResultBuf->pageSize); diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index 3ada2b76c7d085904c5a84f284f2a0f64efa028e..22bdefd59ef8844a560bb2944f8e61ad15f5f27f 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -33,7 +33,7 @@ typedef struct SCompSupporter { } SCompSupporter; int32_t getRowNumForMultioutput(SQueryAttr* pQueryAttr, bool topBottomQuery, bool stable) { - if (pQueryAttr && (!stable)) { + if (pQueryAttr && (!stable)) { // if table is stable, no need return more than 1 no in merge stage for (int16_t i = 0; i < pQueryAttr->numOfOutput; ++i) { if (pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_TOP || pQueryAttr->pExpr1[i].base.functionId == TSDB_FUNC_BOTTOM || @@ -42,6 +42,9 @@ int32_t getRowNumForMultioutput(SQueryAttr* pQueryAttr, bool topBottomQuery, boo return (int32_t)pQueryAttr->pExpr1[i].base.param[0].i64; } } + if (pQueryAttr->uniqueQuery){ + return MAX_UNIQUE_RESULT_ROWS; + } } return 1; @@ -85,6 +88,10 @@ void cleanupResultRowInfo(SResultRowInfo *pResultRowInfo) { for(int32_t i = 0; i < pResultRowInfo->size; ++i) { if (pResultRowInfo->pResult[i]) { tfree(pResultRowInfo->pResult[i]->key); + if (pResultRowInfo->pResult[i]->uniqueHash){ + taosHashCleanup(pResultRowInfo->pResult[i]->uniqueHash); + pResultRowInfo->pResult[i]->uniqueHash = NULL; + } } } @@ -150,11 +157,11 @@ void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16 if (pResultRow->pageId >= 0) { tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResultRow->pageId); - int16_t offset = 0; + int32_t offset = 0; for (int32_t i = 0; i < pRuntimeEnv->pQueryAttr->numOfOutput; ++i) { SResultRowCellInfo *pResultInfo = &pResultRow->pCellInfo[i]; - int16_t size = pRuntimeEnv->pQueryAttr->pExpr1[i].base.resType; + int32_t size = pRuntimeEnv->pQueryAttr->pExpr1[i].base.resBytes; char * s = getPosInResultPage(pRuntimeEnv->pQueryAttr, page, pResultRow->offset, offset); memset(s, 0, size); @@ -192,7 +199,13 @@ SResultRowPool* initResultRowPool(size_t size) { p->numOfElemPerBlock = 128; p->elemSize = (int32_t) size; - p->blockSize = p->numOfElemPerBlock * p->elemSize; + int64_t tmp = p->elemSize; + tmp *= p->numOfElemPerBlock; + if (tmp > 1024*1024*1024){ + qError("ResultRow blockSize is too large:%" PRId64, tmp); + tmp = 128*1024*1024; + } + p->blockSize = tmp; p->position.pos = 0; p->pData = taosArrayInit(8, POINTER_BYTES); @@ -217,7 +230,6 @@ SResultRow* getNewResultRow(SResultRowPool* p) { } p->position.pos = (p->position.pos + 1)%p->numOfElemPerBlock; - initResultRow(ptr); return ptr; } @@ -451,9 +463,7 @@ int32_t tsDescOrder(const void* p1, const void* p2) { } } -void - -orderTheResultRows(SQueryRuntimeEnv* pRuntimeEnv) { +void orderTheResultRows(SQueryRuntimeEnv* pRuntimeEnv) { __compar_fn_t fn = NULL; if (pRuntimeEnv->pQueryAttr->order.order == TSDB_ORDER_ASC) { fn = tsAscOrder; diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index c9a28d3342e46e6d6cd0b8942f3528147bb151de..60c7311d4c0f3d784231fceb8a7e2628a5bd4eda 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -3150,8 +3150,7 @@ static bool loadCachedLast(STsdbQueryHandle* pQueryHandle) { } pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes; - - if (pTable->lastCols[j].bytes > 0) { + if (pTable->lastCols[j].bytes > 0) { void* value = pTable->lastCols[j].pData; switch (pColInfo->info.type) { case TSDB_DATA_TYPE_BINARY: @@ -3205,7 +3204,6 @@ static bool loadCachedLast(STsdbQueryHandle* pQueryHandle) { pColInfo = taosArrayGet(pQueryHandle->pColumns, n); pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;; - if (pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { *(TSKEY *)pData = pTable->lastCols[j].ts; continue; @@ -3231,7 +3229,7 @@ static bool loadCachedLast(STsdbQueryHandle* pQueryHandle) { if (priKey != TSKEY_INITIAL_VAL) { pColInfo = taosArrayGet(pQueryHandle->pColumns, priIdx); pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes; - + *(TSKEY *)pData = priKey; for (int32_t n = 0; n < tgNumOfCols; ++n) { @@ -3241,7 +3239,7 @@ static bool loadCachedLast(STsdbQueryHandle* pQueryHandle) { pColInfo = taosArrayGet(pQueryHandle->pColumns, n); pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;; - + assert (pColInfo->info.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX); if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { @@ -4288,6 +4286,7 @@ void tsdbDestroyTableGroup(STableGroupInfo *pGroupList) { } taosHashCleanup(pGroupList->map); + pGroupList->map = NULL; taosArrayDestroy(&pGroupList->pGroupList); pGroupList->numOfTables = 0; } @@ -4662,4 +4661,4 @@ void tsdbAddScanCallback(TsdbQueryHandleT* queryHandle, readover_callback callba pQueryHandle->readover_cb = callback; pQueryHandle->param = param; return ; -} \ No newline at end of file +} diff --git a/src/util/src/terror.c b/src/util/src/terror.c index acbee18ec21b02761295de90ef9ff535a97739d1..e78d1d37ee900268be5cdc7c2883b74284c65639 100644 --- a/src/util/src/terror.c +++ b/src/util/src/terror.c @@ -299,6 +299,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NOT_ENOUGH_BUFFER, "Query buffer limit ha TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INCONSISTAN, "File inconsistance in replica") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_TIME_CONDITION, "One valid time range condition expected") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_SYS_ERROR, "System error") +TAOS_DEFINE_ERROR(TSDB_CODE_QRY_UNIQUE_RESULT_TOO_LARGE, "Unique result num is too large") // grant TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, "License expired")