diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 7270c9349d19a8382318317800143440ab3db92e..2f98c21e4ffc7f586d89e5e5e0fe6994f51c6fd2 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -3958,8 +3958,8 @@ int32_t validateGroupbyNode(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd const char* msg4 = "join query does not support group by"; const char* msg5 = "not allowed column type for group by"; const char* msg6 = "tags not allowed for table query"; - const char* msg7 = "not support group by expression"; - const char* msg8 = "normal column can only locate at the end of group by clause"; + //const char* msg7 = "not support group by primary key"; + //const char* msg8 = "normal column can only locate at the end of group by clause"; const char* msg9 = "json tag must be use ->'key'"; const char* msg10 = "non json column can not use ->'key'"; const char* msg11 = "group by json->'key' is too long"; @@ -4070,7 +4070,10 @@ int32_t validateGroupbyNode(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd // check if the column type is valid, here only support the bool/tinyint/smallint/bigint group by if (pSchema->type == TSDB_DATA_TYPE_FLOAT || pSchema->type == TSDB_DATA_TYPE_DOUBLE) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5); - } + }/* + if (index.columnIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7); + }*/ tscColumnListInsert(pQueryInfo->colList, index.columnIndex, pTableMeta->id.uid, pSchema); @@ -4085,14 +4088,14 @@ int32_t validateGroupbyNode(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd // 1. only one normal column allowed in the group by clause // 2. the normal column in the group by clause can only located in the end position - if (numOfGroupCols > 1) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7); - } + //if (numOfGroupCols > 1) { + // return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7); + //} for(int32_t i = 0; i < num; ++i) { SColIndex* pIndex = taosArrayGet(pGroupExpr->columnInfo, i); - if (TSDB_COL_IS_NORMAL_COL(pIndex->flag) && i != num - 1) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg8); + if (!TSDB_COL_IS_NORMAL_COL(pIndex->flag)) { + //return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg8); } } diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index fc222e8a81a088b0abead471376ae5f5810f9ab8..0bc7b281c87776da98d29c485787a67efecb876a 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -546,10 +546,17 @@ typedef struct SFillOperatorInfo { bool multigroupResult; } SFillOperatorInfo; +typedef struct SGroupbyDataInfo { + int32_t index; // index of col in dataBlock + int32_t type; + int32_t bytes; +} SGroupbyDataInfo; + typedef struct SGroupbyOperatorInfo { SOptrBasicInfo binfo; - int32_t colIndex; - char *prevData; // previous group by value + SArray *pGroupbyDataInfo; + int32_t totalBytes; + char *prevData; // previous data buf } SGroupbyOperatorInfo; typedef struct SSWindowOperatorInfo { diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index e9eebfc7dd1039709a99c033ac1843daf83e70b8..c9b0e7f7aeaec7fec1494053245e0666f85ff9a3 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -258,7 +258,7 @@ static void getAlignQueryTimeWindow(SQueryAttr *pQueryAttr, int64_t key, int64_t static void setResultBufSize(SQueryAttr* pQueryAttr, SRspResultInfo* pResultInfo); static void setCtxTagForJoin(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable); static void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr); -static void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, char* val, int16_t bytes); +static void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, SGroupbyOperatorInfo *pInfo); static void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, SQLFunctionCtx* pCtx, int32_t* rowCellInfoOffset, int32_t numOfOutput, int32_t tableGroupId); @@ -1569,56 +1569,132 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul updateResultRowInfoActiveIndex(pResultRowInfo, pQueryAttr, pRuntimeEnv->current->lastKey); } +static bool initGroupbyInfo(const SSDataBlock *pSDataBlock, const SGroupbyExpr *pGroupbyExpr, SGroupbyOperatorInfo *pInfo) { + if (pInfo->pGroupbyDataInfo != NULL) { + // no need build group-by info + return true; + } + pInfo->pGroupbyDataInfo = taosArrayInit(pGroupbyExpr->numOfGroupCols, sizeof(SGroupbyDataInfo)); + + for (int32_t k = 0; k < pGroupbyExpr->numOfGroupCols; ++k) { + SColIndex* pColIndex = taosArrayGet(pGroupbyExpr->columnInfo, k); + if (TSDB_COL_IS_TAG(pColIndex->flag)) { + continue; + } + for (int32_t i = 0; i < pSDataBlock->info.numOfCols; ++i) { + SColumnInfoData* pColInfo = taosArrayGet(pSDataBlock->pDataBlock, i); + if (pColInfo->info.colId == pColIndex->colId) { + int32_t type = pColInfo->info.type; + if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) { + return false; + } + pInfo->totalBytes += pColInfo->info.bytes; + + SGroupbyDataInfo info = {.index = i, .type = pColInfo->info.type, .bytes = pColInfo->info.bytes}; + taosArrayInsert(pInfo->pGroupbyDataInfo, k, &info); + break; + } + if (i == pSDataBlock->info.numOfCols - 1) { + // not found groupby col in dataBlock, error + return false; + } + } + } + pInfo->totalBytes += (int32_t)strlen(MULTI_KEY_DELIM) * pGroupbyExpr->numOfGroupCols; + + return true; +} + +static void buildGroupbyKeyBuf(const SSDataBlock *pSDataBlock, SGroupbyOperatorInfo *pInfo, int32_t rowId, char **buf) { + char *p = calloc(1, pInfo->totalBytes); + if (p == NULL) { + *buf = NULL; + return; + } + *buf = p; + for (int32_t i = 0; i < taosArrayGetSize(pInfo->pGroupbyDataInfo); i++) { + SGroupbyDataInfo *pDataInfo = taosArrayGet(pInfo->pGroupbyDataInfo, i); + + SColumnInfoData* pColData = taosArrayGet(pSDataBlock->pDataBlock, pDataInfo->index); + //TODO(yihaoDeng): handle float & double + char *val = ((char *)pColData->pData) + pDataInfo->bytes * rowId; + if (isNull(val, pDataInfo->type)) { + p += pDataInfo->bytes; + continue; + } + if (IS_VAR_DATA_TYPE(pDataInfo->type)) { + memcpy(p, varDataVal(val), varDataLen(val)); + p += varDataLen(val); + } else { + memcpy(p, val, pDataInfo->bytes); + p += pDataInfo->bytes; + } + + memcpy(p, MULTI_KEY_DELIM, strlen(MULTI_KEY_DELIM)); + p += strlen(MULTI_KEY_DELIM); + } +} + +static bool isGroupbyKeyEqual(void *a, void *b, void *ext) { + SGroupbyOperatorInfo *pInfo = (SGroupbyOperatorInfo *)ext; + if (memcmp(a, b, pInfo->totalBytes) == 0) { + return true; + } + int32_t offset = 0; + for (int32_t i = 0; i < taosArrayGetSize(pInfo->pGroupbyDataInfo); i++) { + SGroupbyDataInfo *pDataInfo = taosArrayGet(pInfo->pGroupbyDataInfo, i); + + char *k1 = (char *)a + offset; + char *k2 = (char *)b + offset; + if (getComparFunc(pDataInfo->type, 0)(k1, k2) != 0) { + return false; + } + offset += pDataInfo->bytes; + offset += (int32_t)strlen(MULTI_KEY_DELIM); + } + return true; +} + static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pInfo, SSDataBlock *pSDataBlock) { SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; STableQueryInfo* item = pRuntimeEnv->current; - SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, pInfo->colIndex); - SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; - int16_t bytes = pColInfoData->info.bytes; - int16_t type = pColInfoData->info.type; - if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) { + if (!initGroupbyInfo(pSDataBlock, pRuntimeEnv->pQueryAttr->pGroupbyExpr, pInfo)) { qError("QInfo:0x%"PRIx64" group by not supported on double/float columns, abort", GET_QID(pRuntimeEnv)); return; } + //realloc pRuntimeEnv->keyBuf + pRuntimeEnv->keyBuf = realloc(pRuntimeEnv->keyBuf, pInfo->totalBytes + sizeof(int64_t) + POINTER_BYTES); SColumnInfoData* pFirstColData = taosArrayGet(pSDataBlock->pDataBlock, 0); int64_t* tsList = (pFirstColData->info.type == TSDB_DATA_TYPE_TIMESTAMP)? (int64_t*) pFirstColData->pData:NULL; STimeWindow w = TSWINDOW_INITIALIZER; - int32_t num = 0; + char *key = NULL; + int16_t num = 0; + int32_t type = 0; for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) { - char* val = ((char*)pColInfoData->pData) + bytes * j; - - // Compare with the previous row of this column, and do not set the output buffer again if they are identical. + buildGroupbyKeyBuf(pSDataBlock, pInfo, j, &key); + if (!key) { continue; } if (pInfo->prevData == NULL) { - pInfo->prevData = malloc(bytes); - memcpy(pInfo->prevData, val, bytes); + // first row of + pInfo->prevData = key; num++; continue; - } - - if (IS_VAR_DATA_TYPE(type)) { - int32_t len = varDataLen(val); - if(len == varDataLen(pInfo->prevData) && memcmp(varDataVal(pInfo->prevData), varDataVal(val), len) == 0) { - num++; - continue; - } - } else { - if (memcmp(pInfo->prevData, val, bytes) == 0) { - num++; - continue; - } + } else if (isGroupbyKeyEqual(pInfo->prevData, key, pInfo)) { + num++; + tfree(key); + continue; } if (pQueryAttr->stableQuery && pQueryAttr->stabledev && (pRuntimeEnv->prevResult != NULL)) { - setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, pInfo->prevData, bytes); + setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, pInfo); } - int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, pInfo->prevData, type, bytes, item->groupIndex); + int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, pInfo->prevData, type, pInfo->totalBytes, item->groupIndex); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); } @@ -1626,23 +1702,25 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, &w, j - num, num, tsList, pSDataBlock->info.rows, pOperator->numOfOutput); num = 1; - memcpy(pInfo->prevData, val, bytes); + + tfree(pInfo->prevData); + pInfo->prevData = key; } if (num > 0) { - char* val = ((char*)pColInfoData->pData) + bytes * (pSDataBlock->info.rows - num); - memcpy(pInfo->prevData, val, bytes); - - if (pQueryAttr->stableQuery && pQueryAttr->stabledev && (pRuntimeEnv->prevResult != NULL)) { - setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, val, bytes); - } - - int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, val, type, bytes, item->groupIndex); - if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code - longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); + buildGroupbyKeyBuf(pSDataBlock, pInfo, pSDataBlock->info.rows - num, &key); + if (key) { + tfree(pInfo->prevData); + pInfo->prevData = key; + if (pQueryAttr->stableQuery && pQueryAttr->stabledev && (pRuntimeEnv->prevResult != NULL)) { + setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, pInfo); + } + int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, pInfo->prevData, type, pInfo->totalBytes, item->groupIndex); + if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code + longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); + } + doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, &w, pSDataBlock->info.rows - num, num, tsList, pSDataBlock->info.rows, pOperator->numOfOutput); } - - doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, &w, pSDataBlock->info.rows - num, num, tsList, pSDataBlock->info.rows, pOperator->numOfOutput); } tfree(pInfo->prevData); @@ -1717,22 +1795,22 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf pSDataBlock->info.rows, pOperator->numOfOutput); } -static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) { - if (IS_VAR_DATA_TYPE(type)) { - if (pResultRow->key == NULL) { - pResultRow->key = malloc(varDataTLen(pData)); - varDataCopy(pResultRow->key, pData); - } else { - assert(memcmp(pResultRow->key, pData, varDataTLen(pData)) == 0); - } - } else { - int64_t v = -1; - GET_TYPED_DATA(v, int64_t, type, pData); - - pResultRow->win.skey = v; - pResultRow->win.ekey = v; - } -} +//static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) { +// if (IS_VAR_DATA_TYPE(type)) { +// if (pResultRow->key == NULL) { +// pResultRow->key = malloc(varDataTLen(pData)); +// varDataCopy(pResultRow->key, pData); +// } else { +// assert(memcmp(pResultRow->key, pData, varDataTLen(pData)) == 0); +// } +// } else { +// int64_t v = -1; +// GET_TYPED_DATA(v, int64_t, type, pData); +// +// pResultRow->win.skey = v; +// pResultRow->win.ekey = v; +// } +//} static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *binfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex) { SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; @@ -1744,16 +1822,16 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasic // not assign result buffer yet, add new result buffer, TODO remove it char* d = pData; int16_t len = bytes; - if (IS_VAR_DATA_TYPE(type)) { - d = varDataVal(pData); - len = varDataLen(pData); - } + //if (IS_VAR_DATA_TYPE(type)) { + // d = varDataVal(pData); + // len = varDataLen(pData); + //} int64_t tid = 0; SResultRow *pResultRow = doSetResultOutBufByKey(pRuntimeEnv, pResultRowInfo, tid, d, len, true, groupIndex); assert (pResultRow != NULL); - setResultRowKey(pResultRow, pData, type); + //setResultRowKey(pResultRow, pData, type); if (pResultRow->pageId == -1) { int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, groupIndex, pRuntimeEnv->pQueryAttr->resultRowSize); if (ret != 0) { @@ -4161,7 +4239,7 @@ void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx } -void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, char* val, int16_t bytes) { +void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, SGroupbyOperatorInfo *pInfo) { SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; int32_t numOfExprs = pQueryAttr->numOfOutput; @@ -4174,6 +4252,20 @@ void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunction pCtx[i].param[0].arr = NULL; pCtx[i].param[0].nType = TSDB_DATA_TYPE_INT; // avoid freeing the memory by setting the type to be int + // find colid in dataBlock + int32_t bytes, offset = 0; + char* val = NULL; + for (int32_t idx = 0; idx < taosArrayGetSize(pInfo->pGroupbyDataInfo); idx++) { + SGroupbyDataInfo *pDataInfo = taosArrayGet(pInfo->pGroupbyDataInfo, idx); + if (pDataInfo->index == pExpr1->colInfo.colId) { + bytes = pDataInfo->bytes; + val = pInfo->prevData + offset; + break; + } + offset += pDataInfo->bytes; + } + if (val == NULL) { continue; } + // TODO use hash to speedup this loop int32_t numOfGroup = (int32_t)taosArrayGetSize(pRuntimeEnv->prevResult); for (int32_t j = 0; j < numOfGroup; ++j) { @@ -7074,10 +7166,6 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pRuntimeEnv->pQueryAttr->order.order); setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput); - if (pInfo->colIndex == -1) { - pInfo->colIndex = getGroupbyColumnIndex(pRuntimeEnv->pQueryAttr->pGroupbyExpr, pBlock); - } - doHashGroupbyAgg(pOperator, pInfo, pBlock); } @@ -7352,7 +7440,7 @@ static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) { static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) { SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*) param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); - + taosArrayDestroy(&pInfo->pGroupbyDataInfo); if (pInfo->prevData) { tfree(pInfo->prevData); } @@ -7829,7 +7917,7 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato return NULL; } - pInfo->colIndex = -1; // group by column index + //pInfo->colIndex = -1; // group by column index pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; diff --git a/tests/script/general/parser/groupby.sim b/tests/script/general/parser/groupby.sim index 9baf028bc8233c0fd6c3ada9392bca717270b3be..cbebfaae8eafaaff32f8840031ae2e01bf764782 100644 --- a/tests/script/general/parser/groupby.sim +++ b/tests/script/general/parser/groupby.sim @@ -783,13 +783,13 @@ if $data11 != 2 then return -1 endi -sql_error select count(*) from m1 group by tbname,k,f1; -sql_error select count(*) from m1 group by tbname,k,a; -sql_error select count(*) from m1 group by k, tbname; -sql_error select count(*) from m1 group by k,f1; +#sql_error select count(*) from m1 group by tbname,k,f1; +#sql_error select count(*) from m1 group by tbname,k,a; +#sql_error select count(*) from m1 group by k, tbname; +#sql_error select count(*) from m1 group by k,f1; sql_error select count(*) from tm0 group by tbname; sql_error select count(*) from tm0 group by a; -sql_error select count(*) from tm0 group by k,f1; +#sql_error select count(*) from tm0 group by k,f1; sql_error select count(*),f1 from m1 group by tbname,k;