diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 573933388699d14ee039f19054830a2e105c126b..a6db366643eb45cb425d938618faeb3915e70264 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1948,10 +1948,10 @@ bool isValidDistinctSql(SQueryInfo* pQueryInfo) { && (pQueryInfo->type & TSDB_QUERY_TYPE_TABLE_QUERY) != TSDB_QUERY_TYPE_TABLE_QUERY) { return false; } - if (tscNumOfExprs(pQueryInfo) == 1){ - return true; - } - return false; + //if (tscNumOfExprs(pQueryInfo) == 1){ + // return true; + //} + return true; } static bool hasNoneUserDefineExpr(SQueryInfo* pQueryInfo) { diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index ce70a9ba4ab4d2dee8cab97142cac96a9a41e9cf..8249a84e7f3ca0c30f33fe6fa836b7169907bf70 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -508,13 +508,21 @@ typedef struct SStateWindowOperatorInfo { bool reptScan; } SStateWindowOperatorInfo ; +typedef struct SDistinctDataInfo { + int32_t index; + int32_t type; + int32_t bytes; +} SDistinctDataInfo; + typedef struct SDistinctOperatorInfo { SHashObj *pSet; SSDataBlock *pRes; bool recordNullVal; //has already record the null value, no need to try again int64_t threshold; int64_t outputCapacity; - int32_t colIndex; + int32_t totalBytes; + char* buf; + SArray* pDistinctDataInfo; } SDistinctOperatorInfo; struct SGlobalMerger; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 5323b4306f95b30503f89903e9d1503553ce7398..fa54e9dc6b24e4877069be3c299285016994434b 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -44,6 +44,10 @@ #define SDATA_BLOCK_INITIALIZER (SDataBlockInfo) {{0}, 0} +#define MULTI_KEY_DELIM "-" + +#define HASH_CAPACITY_LIMIT 10000000 + #define TIME_WINDOW_COPY(_dst, _src) do {\ (_dst).skey = (_src).skey;\ (_dst).ekey = (_src).ekey;\ @@ -6109,6 +6113,8 @@ static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) { static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) { SDistinctOperatorInfo* pInfo = (SDistinctOperatorInfo*) param; taosHashCleanup(pInfo->pSet); + tfree(pInfo->buf); + taosArrayDestroy(pInfo->pDistinctDataInfo); pInfo->pRes = destroyOutputBuf(pInfo->pRes); } @@ -6600,20 +6606,65 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf return pOperator; } +static bool initMultiDistinctInfo(SDistinctOperatorInfo *pInfo, SOperatorInfo* pOperator, SSDataBlock *pBlock) { + if (taosArrayGetSize(pInfo->pDistinctDataInfo) == pOperator->numOfOutput) { + // distinct info already inited + return true; + } + for (int i = 0; i < pOperator->numOfOutput; i++) { + pInfo->totalBytes += pOperator->pExpr[i].base.colBytes; + } + for (int i = 0; i < pOperator->numOfOutput; i++) { + int numOfBlock = taosArrayGetSize(pBlock->pDataBlock); + assert(i < numOfBlock); + for (int j = 0; j < numOfBlock; j++) { + SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, j); + if (pColDataInfo->info.colId == pOperator->pExpr[i].base.resColId) { + SDistinctDataInfo item = {.index = j, .type = pColDataInfo->info.type, .bytes = pColDataInfo->info.bytes}; + taosArrayInsert(pInfo->pDistinctDataInfo, i, &item); + } + } + } + pInfo->totalBytes += strlen(MULTI_KEY_DELIM) * (pOperator->numOfOutput); + pInfo->buf = calloc(1, pInfo->totalBytes); + return taosArrayGetSize(pInfo->pDistinctDataInfo) == pOperator->numOfOutput ? true : false; +} +static void buildMultiDistinctKey(SDistinctOperatorInfo *pInfo, SSDataBlock *pBlock, int32_t rowId) { + char *p = pInfo->buf; + memset(p, 0, pInfo->totalBytes); + + for (int i = 0; i < taosArrayGetSize(pInfo->pDistinctDataInfo); i++) { + SDistinctDataInfo* pDistDataInfo = (SDistinctDataInfo *)taosArrayGet(pInfo->pDistinctDataInfo, i); + SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pDistDataInfo->index); + char *val = ((char *)pColDataInfo->pData) + pColDataInfo->info.bytes * rowId; + if (isNull(val, pDistDataInfo->type)) { + p += pDistDataInfo->bytes; + continue; + } + if (IS_VAR_DATA_TYPE(pDistDataInfo->type)) { + memcpy(p, varDataVal(val), varDataLen(val)); + p += varDataLen(val); + } else { + memcpy(p, val, pDistDataInfo->bytes); + p += pDistDataInfo->bytes; + } + memcpy(p, MULTI_KEY_DELIM, strlen(MULTI_KEY_DELIM)); + p += strlen(MULTI_KEY_DELIM); + } +} static SSDataBlock* hashDistinct(void* param, bool* newgroup) { SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->status == OP_EXEC_DONE) { return NULL; } - - + SDistinctOperatorInfo* pInfo = pOperator->info; SSDataBlock* pRes = pInfo->pRes; - pRes->info.rows = 0; SSDataBlock* pBlock = NULL; + while(1) { publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); @@ -6624,63 +6675,44 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { pOperator->status = OP_EXEC_DONE; break; } - if (pInfo->colIndex == -1) { - for (int i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) { - SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, i); - if (pColDataInfo->info.colId == pOperator->pExpr[0].base.resColId) { - pInfo->colIndex = i; - break; - } - } - } - if (pInfo->colIndex == -1) { + if (!initMultiDistinctInfo(pInfo, pOperator, pBlock)) { setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); pOperator->status = OP_EXEC_DONE; - return NULL; + break; } - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->colIndex); - - int16_t bytes = pColInfoData->info.bytes; - int16_t type = pColInfoData->info.type; - - // ensure the output buffer size - SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, 0); + // ensure result output buf if (pRes->info.rows + pBlock->info.rows > pInfo->outputCapacity) { int32_t newSize = pRes->info.rows + pBlock->info.rows; - char* tmp = realloc(pResultColInfoData->pData, newSize * bytes); - if (tmp == NULL) { - return NULL; - } else { - pResultColInfoData->pData = tmp; - pInfo->outputCapacity = newSize; + for (int i = 0; i < taosArrayGetSize(pRes->pDataBlock); i++) { + SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, i); + SDistinctDataInfo* pDistDataInfo = taosArrayGet(pInfo->pDistinctDataInfo, i); + char* tmp = realloc(pResultColInfoData->pData, newSize * pDistDataInfo->bytes); + if (tmp == NULL) { + return NULL; + } else { + pResultColInfoData->pData = tmp; + } } + pInfo->outputCapacity = newSize; } - for(int32_t i = 0; i < pBlock->info.rows; ++i) { - char* val = ((char*)pColInfoData->pData) + bytes * i; - if (isNull(val, type)) { - continue; - } - char* p = val; - size_t keyLen = 0; - if (IS_VAR_DATA_TYPE(pOperator->pExpr->base.colType)) { - tstr* var = (tstr*)(val); - p = var->data; - keyLen = varDataLen(var); - } else { - keyLen = bytes; - } + for (int32_t i = 0; i < pBlock->info.rows; i++) { + buildMultiDistinctKey(pInfo, pBlock, i); + if (taosHashGet(pInfo->pSet, pInfo->buf, pInfo->totalBytes) == NULL) { + int32_t dummy; + taosHashPut(pInfo->pSet, pInfo->buf, pInfo->totalBytes, &dummy, sizeof(dummy)); + for (int j = 0; j < taosArrayGetSize(pRes->pDataBlock); j++) { + SDistinctDataInfo* pDistDataInfo = taosArrayGet(pInfo->pDistinctDataInfo, j); // distinct meta info + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pDistDataInfo->index); //src + SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, j); // dist - int dummy; - void* res = taosHashGet(pInfo->pSet, p, keyLen); - if (res == NULL) { - taosHashPut(pInfo->pSet, p, keyLen, &dummy, sizeof(dummy)); - char* start = pResultColInfoData->pData + bytes * pInfo->pRes->info.rows; - memcpy(start, val, bytes); + char* val = ((char*)pColInfoData->pData) + pDistDataInfo->bytes * i; + char *start = pResultColInfoData->pData + pDistDataInfo->bytes * pInfo->pRes->info.rows; + memcpy(start, val, pDistDataInfo->bytes); + } pRes->info.rows += 1; - } + } } - if (pRes->info.rows >= pInfo->threshold) { break; } @@ -6691,11 +6723,14 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SDistinctOperatorInfo* pInfo = calloc(1, sizeof(SDistinctOperatorInfo)); - pInfo->colIndex = -1; - pInfo->threshold = 10000000; // distinct result threshold - pInfo->outputCapacity = 4096; - pInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(pExpr->base.colType), false, HASH_NO_LOCK); + pInfo->totalBytes = 0; + pInfo->buf = NULL; + pInfo->threshold = HASH_CAPACITY_LIMIT; // distinct result threshold + pInfo->outputCapacity = 4096; + pInfo->pDistinctDataInfo = taosArrayInit(numOfOutput, sizeof(SDistinctDataInfo)); + pInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, (int32_t) pInfo->outputCapacity); + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "DistinctOperator";