diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 03b2b696c2e4b0ff527fc75daf932067a4d15626..5fd0852e50bb918f9d18a5dc362818be8ee3642b 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1944,20 +1944,6 @@ static void addPrimaryTsColIntoResult(SQueryInfo* pQueryInfo, SSqlCmd* pCmd) { pQueryInfo->type |= TSDB_QUERY_TYPE_PROJECTION_QUERY; } -bool isValidDistinctSql(SQueryInfo* pQueryInfo) { - if (pQueryInfo == NULL) { - return false; - } - if ((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_QUERY) != TSDB_QUERY_TYPE_STABLE_QUERY - && (pQueryInfo->type & TSDB_QUERY_TYPE_TABLE_QUERY) != TSDB_QUERY_TYPE_TABLE_QUERY) { - return false; - } - if (tscNumOfExprs(pQueryInfo) == 1){ - return true; - } - return false; -} - static bool hasNoneUserDefineExpr(SQueryInfo* pQueryInfo) { size_t numOfExprs = taosArrayGetSize(pQueryInfo->exprList); for (int32_t i = 0; i < numOfExprs; ++i) { @@ -2047,8 +2033,11 @@ int32_t validateSelectNodeList(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SArray* pS const char* msg1 = "too many items in selection clause"; const char* msg2 = "functions or others can not be mixed up"; const char* msg3 = "not support query expression"; - const char* msg4 = "only support distinct one column or tag"; + const char* msg4 = "not support distinct mixed with proj/agg func"; const char* msg5 = "invalid function name"; + const char* msg6 = "not support distinct mixed with join"; + const char* msg7 = "not support distinct mixed with groupby"; + const char* msg8 = "not support distinct in nest query"; // too many result columns not support order by in query if (taosArrayGetSize(pSelNodeList) > TSDB_MAX_COLUMNS) { @@ -2060,17 +2049,22 @@ int32_t validateSelectNodeList(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SArray* pS } bool hasDistinct = false; - size_t numOfExpr = taosArrayGetSize(pSelNodeList); + bool hasAgg = false; + size_t numOfExpr = taosArrayGetSize(pSelNodeList); + int32_t distIdx = -1; for (int32_t i = 0; i < numOfExpr; ++i) { int32_t outputIndex = (int32_t)tscNumOfExprs(pQueryInfo); tSqlExprItem* pItem = taosArrayGet(pSelNodeList, i); - if (hasDistinct == false) { - hasDistinct = (pItem->distinct == true); - } + hasDistinct = (pItem->distinct == true); + distIdx = hasDistinct ? i : -1; + } int32_t type = pItem->pNode->type; if (type == SQL_NODE_SQLFUNCTION) { + hasAgg = true; + if (hasDistinct) break; + pItem->pNode->functionId = isValidFunction(pItem->pNode->Expr.operand.z, pItem->pNode->Expr.operand.n); SUdfInfo* pUdfInfo = NULL; if (pItem->pNode->functionId < 0) { @@ -2106,9 +2100,21 @@ int32_t validateSelectNodeList(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SArray* pS } } + + //TODO(dengyihao), refactor as function + //handle distinct func mixed with other func if (hasDistinct == true) { - if (!isValidDistinctSql(pQueryInfo) ) { + if (distIdx != 0 || hasAgg) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg4); + } + if (joinQuery) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6); + } + if (pQueryInfo->groupbyExpr.numOfGroupCols != 0) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7); + } + if (pQueryInfo->pDownstream != NULL) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg8); } pQueryInfo->distinct = true; } @@ -8682,12 +8688,12 @@ static int32_t doValidateSubquery(SSqlNode* pSqlNode, int32_t index, SSqlObj* pS pSub->pUdfInfo = pUdfInfo; pSub->udfCopy = true; + pSub->pDownstream = pQueryInfo; int32_t code = validateSqlNode(pSql, p, pSub); if (code != TSDB_CODE_SUCCESS) { return code; } - pSub->pDownstream = pQueryInfo; // create dummy table meta info STableMetaInfo* pTableMetaInfo1 = calloc(1, sizeof(STableMetaInfo)); @@ -8795,6 +8801,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf if (validateGroupbyNode(pQueryInfo, pSqlNode->pGroupby, pCmd) != TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_OPERATION; } + if (validateSelectNodeList(pCmd, pQueryInfo, pSqlNode->pSelNodeList, false, timeWindowQuery, true) != TSDB_CODE_SUCCESS) { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 0a2c2bd5f3f542fca9c6ef5fbfa2bf6546c956c2..38927be4ac1ab927dea17b02ebfee2db9d2cad7a 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -409,7 +409,7 @@ static void doProcessMsgFromServer(SSchedMsg* pSchedMsg) { if ((TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY | TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_PROJECTION_QUERY)) || - (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_NEST_SUBQUERY))) { + (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_NEST_SUBQUERY)) || (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_SUBQUERY) && pQueryInfo->distinct)) { // do nothing in case of super table subquery } else { pSql->retry += 1; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 53ad775852b34d5a345cba04ea07c868e2379d62..bef1d242c9f1e249dbd7d66b7ad029666faaf2d3 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1522,7 +1522,6 @@ void tscFreeSqlObj(SSqlObj* pSql) { tscFreeSqlResult(pSql); tscResetSqlCmd(pCmd, false, pSql->self); - memset(pCmd->payload, 0, (size_t)pCmd->allocSize); tfree(pCmd->payload); pCmd->allocSize = 0; @@ -3617,7 +3616,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t pNewQueryInfo->prjOffset = pQueryInfo->prjOffset; pNewQueryInfo->numOfTables = 0; pNewQueryInfo->pTableMetaInfo = NULL; - pNewQueryInfo->bufLen = pQueryInfo->bufLen; + pNewQueryInfo->bufLen = pQueryInfo->bufLen; + pNewQueryInfo->distinct = pQueryInfo->distinct; pNewQueryInfo->buf = malloc(pQueryInfo->bufLen); if (pNewQueryInfo->buf == NULL) { diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index 62f369d98777fa79bf40300d178f5f5eeb04a2a4..10673beb922319d19a9fcc28689193fc34d8ae46 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -60,6 +60,7 @@ extern char tsLocale[]; extern char tsCharset[]; // default encode string extern int8_t tsEnableCoreFile; extern int32_t tsCompressMsgSize; +extern int32_t tsMaxNumOfDistinctResults; extern char tsTempDir[]; //query buffer management diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index d1b816f122c40fb1c5dea9ec8fa9f0406142d5de..c7e7a7362be2a2ae8cfa3ae8ed0eb4fa332a4c7f 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -87,6 +87,9 @@ int32_t tsMaxNumOfOrderedResults = 100000; // 10 ms for sliding time, the value will changed in case of time precision changed int32_t tsMinSlidingTime = 10; +// the maxinum number of distict query result +int32_t tsMaxNumOfDistinctResults = 1000 * 10000; + // 1 us for interval time range, changed accordingly int32_t tsMinIntervalTime = 1; @@ -544,6 +547,17 @@ static void doInitGlobalConfig(void) { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); + cfg.option = "maxNumOfDistinctRes"; + cfg.ptr = &tsMaxNumOfDistinctResults; + cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT; + cfg.minValue = 10*10000; + cfg.maxValue = 10000*10000; + cfg.ptrLength = 0; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + + cfg.option = "numOfMnodes"; cfg.ptr = &tsNumOfMnodes; cfg.valType = TAOS_CFG_VTYPE_INT32; diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 2c57310a2f31807a5177abe3691c6e24d3e017b0..84f00891e5e75bc7b3c7a32f3de0d040bd4c5cb7 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -515,13 +515,21 @@ typedef struct SStateWindowOperatorInfo { bool reptScan; } SStateWindowOperatorInfo; +typedef struct SDistinctDataInfo { + int32_t index; + int32_t type; + int32_t bytes; +} SDistinctDataInfo; + typedef struct SDistinctOperatorInfo { SHashObj *pSet; SSDataBlock *pRes; bool recordNullVal; //has already record the null value, no need to try again int64_t threshold; int64_t outputCapacity; - int32_t colIndex; + int32_t totalBytes; + char* buf; + SArray* pDistinctDataInfo; } SDistinctOperatorInfo; struct SGlobalMerger; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index e0761b38b387ff24585073976d655341eea6d3ac..d799d56cea8e0956ea1abc3fcc898d40f3e38916 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -43,6 +43,10 @@ #define SDATA_BLOCK_INITIALIZER (SDataBlockInfo) {{0}, 0} +#define MULTI_KEY_DELIM "-" + +#define HASH_CAPACITY_LIMIT 10000000 + #define TIME_WINDOW_COPY(_dst, _src) do {\ (_dst).skey = (_src).skey;\ (_dst).ekey = (_src).ekey;\ @@ -3569,6 +3573,7 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo; int64_t tid = 0; + pRuntimeEnv->keyBuf = realloc(pRuntimeEnv->keyBuf, sizeof(tid) + sizeof(int64_t) + POINTER_BYTES); SResultRow* pRow = doSetResultOutBufByKey(pRuntimeEnv, pResultRowInfo, tid, (char *)&tid, sizeof(tid), true, uid); for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { @@ -6528,6 +6533,8 @@ static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) { static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) { SDistinctOperatorInfo* pInfo = (SDistinctOperatorInfo*) param; taosHashCleanup(pInfo->pSet); + tfree(pInfo->buf); + taosArrayDestroy(pInfo->pDistinctDataInfo); pInfo->pRes = destroyOutputBuf(pInfo->pRes); } @@ -7069,18 +7076,64 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf return pOperator; } +static bool initMultiDistinctInfo(SDistinctOperatorInfo *pInfo, SOperatorInfo* pOperator, SSDataBlock *pBlock) { + if (taosArrayGetSize(pInfo->pDistinctDataInfo) == pOperator->numOfOutput) { + // distinct info already inited + return true; + } + for (int i = 0; i < pOperator->numOfOutput; i++) { + pInfo->totalBytes += pOperator->pExpr[i].base.colBytes; + } + for (int i = 0; i < pOperator->numOfOutput; i++) { + int numOfBlock = (int)(taosArrayGetSize(pBlock->pDataBlock)); + assert(i < numOfBlock); + for (int j = 0; j < numOfBlock; j++) { + SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, j); + if (pColDataInfo->info.colId == pOperator->pExpr[i].base.resColId) { + SDistinctDataInfo item = {.index = j, .type = pColDataInfo->info.type, .bytes = pColDataInfo->info.bytes}; + taosArrayInsert(pInfo->pDistinctDataInfo, i, &item); + } + } + } + pInfo->totalBytes += (int32_t)strlen(MULTI_KEY_DELIM) * (pOperator->numOfOutput); + pInfo->buf = calloc(1, pInfo->totalBytes); + return taosArrayGetSize(pInfo->pDistinctDataInfo) == pOperator->numOfOutput ? true : false; +} +static void buildMultiDistinctKey(SDistinctOperatorInfo *pInfo, SSDataBlock *pBlock, int32_t rowId) { + char *p = pInfo->buf; + memset(p, 0, pInfo->totalBytes); + + for (int i = 0; i < taosArrayGetSize(pInfo->pDistinctDataInfo); i++) { + SDistinctDataInfo* pDistDataInfo = (SDistinctDataInfo *)taosArrayGet(pInfo->pDistinctDataInfo, i); + SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pDistDataInfo->index); + char *val = ((char *)pColDataInfo->pData) + pColDataInfo->info.bytes * rowId; + if (isNull(val, pDistDataInfo->type)) { + p += pDistDataInfo->bytes; + continue; + } + if (IS_VAR_DATA_TYPE(pDistDataInfo->type)) { + memcpy(p, varDataVal(val), varDataLen(val)); + p += varDataLen(val); + } else { + memcpy(p, val, pDistDataInfo->bytes); + p += pDistDataInfo->bytes; + } + memcpy(p, MULTI_KEY_DELIM, strlen(MULTI_KEY_DELIM)); + p += strlen(MULTI_KEY_DELIM); + } +} static SSDataBlock* hashDistinct(void* param, bool* newgroup) { SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->status == OP_EXEC_DONE) { return NULL; } - SDistinctOperatorInfo* pInfo = pOperator->info; SSDataBlock* pRes = pInfo->pRes; pRes->info.rows = 0; SSDataBlock* pBlock = NULL; + while(1) { publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); @@ -7091,63 +7144,44 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { pOperator->status = OP_EXEC_DONE; break; } - if (pInfo->colIndex == -1) { - for (int i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) { - SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, i); - if (pColDataInfo->info.colId == pOperator->pExpr[0].base.resColId) { - pInfo->colIndex = i; - break; - } - } - } - if (pInfo->colIndex == -1) { + if (!initMultiDistinctInfo(pInfo, pOperator, pBlock)) { setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); pOperator->status = OP_EXEC_DONE; - return NULL; + break; } - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->colIndex); - - int16_t bytes = pColInfoData->info.bytes; - int16_t type = pColInfoData->info.type; - - // ensure the output buffer size - SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, 0); + // ensure result output buf if (pRes->info.rows + pBlock->info.rows > pInfo->outputCapacity) { int32_t newSize = pRes->info.rows + pBlock->info.rows; - char* tmp = realloc(pResultColInfoData->pData, newSize * bytes); - if (tmp == NULL) { - return NULL; - } else { - pResultColInfoData->pData = tmp; - pInfo->outputCapacity = newSize; + for (int i = 0; i < taosArrayGetSize(pRes->pDataBlock); i++) { + SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, i); + SDistinctDataInfo* pDistDataInfo = taosArrayGet(pInfo->pDistinctDataInfo, i); + char* tmp = realloc(pResultColInfoData->pData, newSize * pDistDataInfo->bytes); + if (tmp == NULL) { + return NULL; + } else { + pResultColInfoData->pData = tmp; + } } + pInfo->outputCapacity = newSize; } - for(int32_t i = 0; i < pBlock->info.rows; ++i) { - char* val = ((char*)pColInfoData->pData) + bytes * i; - if (isNull(val, type)) { - continue; - } - char* p = val; - size_t keyLen = 0; - if (IS_VAR_DATA_TYPE(pOperator->pExpr->base.colType)) { - tstr* var = (tstr*)(val); - p = var->data; - keyLen = varDataLen(var); - } else { - keyLen = bytes; - } + for (int32_t i = 0; i < pBlock->info.rows; i++) { + buildMultiDistinctKey(pInfo, pBlock, i); + if (taosHashGet(pInfo->pSet, pInfo->buf, pInfo->totalBytes) == NULL) { + int32_t dummy; + taosHashPut(pInfo->pSet, pInfo->buf, pInfo->totalBytes, &dummy, sizeof(dummy)); + for (int j = 0; j < taosArrayGetSize(pRes->pDataBlock); j++) { + SDistinctDataInfo* pDistDataInfo = taosArrayGet(pInfo->pDistinctDataInfo, j); // distinct meta info + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pDistDataInfo->index); //src + SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, j); // dist - int dummy; - void* res = taosHashGet(pInfo->pSet, p, keyLen); - if (res == NULL) { - taosHashPut(pInfo->pSet, p, keyLen, &dummy, sizeof(dummy)); - char* start = pResultColInfoData->pData + bytes * pInfo->pRes->info.rows; - memcpy(start, val, bytes); + char* val = ((char*)pColInfoData->pData) + pDistDataInfo->bytes * i; + char *start = pResultColInfoData->pData + pDistDataInfo->bytes * pInfo->pRes->info.rows; + memcpy(start, val, pDistDataInfo->bytes); + } pRes->info.rows += 1; - } + } } - if (pRes->info.rows >= pInfo->threshold) { break; } @@ -7158,11 +7192,14 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SDistinctOperatorInfo* pInfo = calloc(1, sizeof(SDistinctOperatorInfo)); - pInfo->colIndex = -1; - pInfo->threshold = 10000000; // distinct result threshold - pInfo->outputCapacity = 4096; - pInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(pExpr->base.colType), false, HASH_NO_LOCK); + pInfo->totalBytes = 0; + pInfo->buf = NULL; + pInfo->threshold = tsMaxNumOfDistinctResults; // distinct result threshold + pInfo->outputCapacity = 4096; + pInfo->pDistinctDataInfo = taosArrayInit(numOfOutput, sizeof(SDistinctDataInfo)); + pInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, (int32_t) pInfo->outputCapacity); + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "DistinctOperator"; diff --git a/tests/script/general/parser/distinct.sim b/tests/script/general/parser/distinct.sim new file mode 100644 index 0000000000000000000000000000000000000000..e0eb74b5a57c7ef762155d2afae6b02e04213050 --- /dev/null +++ b/tests/script/general/parser/distinct.sim @@ -0,0 +1,88 @@ +system sh/stop_dnodes.sh + +system sh/deploy.sh -n dnode1 -i 1 +system sh/cfg.sh -n dnode1 -c walLevel -v 1 +system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 5 +system sh/exec.sh -n dnode1 -s start +sleep 100 +sql connect + +$dbPrefix = sav_db +$tbPrefix = sav_tb +$stbPrefix = sav_stb +$tbNum = 20 +$rowNum = 10 +$totalNum = $tbNum * $rowNum +$ts0 = 1537146000000 +$delta = 600000 +print ========== alter.sim +$i = 0 +$db = $dbPrefix +$stb = $stbPrefix + +sql drop database if exists $db +sql create database $db +sql use $db +print ====== create tables +sql create table $stb (ts timestamp, c1 int) tags(t1 int, t2 int) + +$i = 0 +$ts = $ts0 +while $i < $tbNum + $tb = $tbPrefix . $i + sql create table $tb using $stb tags( $i , 0 ) + $i = $i + 1 + sql insert into $tb values('2015-08-18 00:00:00', 1); + sql insert into $tb values('2015-08-18 00:06:00', 2); + sql insert into $tb values('2015-08-18 00:12:00', 3); + sql insert into $tb values('2015-08-18 00:18:00', 4); + sql insert into $tb values('2015-08-18 00:24:00', 5); + sql insert into $tb values('2015-08-18 00:30:00', 6); +endw +$i = 0 +$tb = $tbPrefix . $i + +print ====== table created + +#### select distinct tag +sql select distinct t1 from $stb +if $rows != $tbNum then + return -1 +endi + +#### select distinct tag +sql select distinct t2 from $stb +if $rows != 1 then + print $rows + return -1 +endi + +#### select multi normal column +sql select distinct ts, c1 from $stb +if $rows != 6 then + return -1 +endi + +#### select multi column +sql select distinct ts from $stb +if $rows != 6 then + return -1 +endi + +### select multi normal column +### select distinct multi column on sub table + +sql select distinct ts, c1 from $tb +if $rows != 6 then + return -1 +endi + + +### select distinct +sql drop database $db +sql show databases +if $rows != 0 then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT