From 35f074927f3bf3ff982976fd6a9a9cb632972e9d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 26 Apr 2021 17:01:36 +0800 Subject: [PATCH] [td-225]fix the bug found by crash_gen --- src/client/src/tscAsync.c | 10 ++- src/client/src/tscParseInsert.c | 1 + src/client/src/tscUtil.c | 1 + src/query/inc/qExecutor.h | 15 +++- src/query/src/qExecutor.c | 105 ++++++++++++++++++++-- src/query/src/qPlan.c | 31 ++++--- tests/script/general/parser/testSuite.sim | 3 +- 7 files changed, 143 insertions(+), 23 deletions(-) diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index a17ae913d0..a972f194c5 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -447,18 +447,22 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { } if (pCmd->insertType == TSDB_QUERY_TYPE_STMT_INSERT) { - STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); + STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); code = tscGetTableMeta(pSql, pTableMetaInfo); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { taosReleaseRef(tscObjRef, pSql->self); return; } else { - assert(code == TSDB_CODE_SUCCESS); + assert(code == TSDB_CODE_SUCCESS); } (*pSql->fp)(pSql->param, pSql, code); } else if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT)) { - tscHandleMultivnodeInsert(pSql); + if (pCmd->dataSourceType == DATA_FROM_DATA_FILE) { + tscImportDataFromFile(pSql); + } else { + tscHandleMultivnodeInsert(pSql); + } } else { SQueryInfo* pQueryInfo1 = tscGetQueryInfo(pCmd, pCmd->clauseIndex); executeQuery(pSql, pQueryInfo1); diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 9cfe7909b4..f75c7f036b 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1527,6 +1527,7 @@ void tscImportDataFromFile(SSqlObj *pSql) { } assert(pCmd->dataSourceType == DATA_FROM_DATA_FILE && strlen(pCmd->payload) != 0); + pCmd->active = pCmd->pQueryInfo[0]; SImportFileSupport *pSupporter = calloc(1, sizeof(SImportFileSupport)); SSqlObj *pNew = createSubqueryObj(pSql, 0, parseFileSendDataBlock, pSupporter, TSDB_SQL_INSERT, NULL); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 2f304a8fea..86640dfbcd 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -3543,6 +3543,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt pQueryAttr->queryBlockDist = isBlockDistQuery(pQueryInfo); pQueryAttr->pointInterpQuery = tscIsPointInterpQuery(pQueryInfo); pQueryAttr->timeWindowInterpo = timeWindowInterpoRequired(pQueryInfo); + pQueryAttr->distinctTag = pQueryInfo->distinctTag; pQueryAttr->numOfCols = numOfCols; pQueryAttr->numOfOutput = numOfOutput; diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index c4bbd43296..b9361650e9 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -195,6 +195,7 @@ typedef struct SQueryAttr { bool simpleAgg; bool pointInterpQuery; // point interpolation query bool needReverseScan; // need reverse scan + bool distinctTag; // distinct tag query int32_t interBufSize; // intermediate buffer sizse int32_t havingNum; // having expr number @@ -297,9 +298,10 @@ enum OPERATOR_TYPE_E { OP_MultiTableAggregate = 14, OP_MultiTableTimeInterval = 15, OP_DummyInput = 16, //TODO remove it after fully refactor. - OP_MultiwaySort = 17, // multi-way data merge into one input stream. + OP_MultiwayMergeSort = 17, // multi-way data merge into one input stream. OP_GlobalAggregate = 18, // global merge for the multi-way data sources. - OP_Filter = 19, + OP_Filter = 19, + OP_Distinct = 20, }; typedef struct SOperatorInfo { @@ -463,6 +465,14 @@ typedef struct SSWindowOperatorInfo { int32_t start; // start row index } SSWindowOperatorInfo; +typedef struct SDistinctOperatorInfo { + SHashObj *pSet; + SSDataBlock *pRes; + bool recordNullVal; //has already record the null value, no need to try again + int64_t threshold; + int64_t outputCapacity; +} SDistinctOperatorInfo; + struct SLocalMerger; typedef struct SMultiwayMergeInfo { @@ -498,6 +508,7 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput); +SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv); SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows, void* merger, bool groupMix); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index c4e0383f35..4ef45f7a7c 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -1780,7 +1780,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf break; } - case OP_MultiwaySort: { + case OP_MultiwayMergeSort: { bool groupMix = true; if(pQueryAttr->slimit.offset != 0 || pQueryAttr->slimit.limit != -1) { groupMix = false; @@ -1791,15 +1791,19 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf } case OP_GlobalAggregate: { - pRuntimeEnv->proot = - createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, - pQueryAttr->numOfExpr3, merger); + pRuntimeEnv->proot = createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, + pQueryAttr->numOfExpr3, merger); break; } case OP_SLimit: { pRuntimeEnv->proot = createSLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, - pQueryAttr->numOfExpr3, merger); + pQueryAttr->numOfExpr3, merger); + break; + } + + case OP_Distinct: { + pRuntimeEnv->proot = createDistinctOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); break; } @@ -4621,7 +4625,7 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "MultiwaySortOperator"; - pOperator->operatorType = OP_MultiwaySort; + pOperator->operatorType = OP_MultiwayMergeSort; pOperator->blockingOptr = false; pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; @@ -5338,6 +5342,12 @@ static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) { doDestroyFilterInfo(pInfo->pFilterInfo, pInfo->numOfFilterCols); } +static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) { + SDistinctOperatorInfo* pInfo = (SDistinctOperatorInfo*) param; + taosHashCleanup(pInfo->pSet); + pInfo->pRes = destroyOutputBuf(pInfo->pRes); +} + SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); @@ -5641,7 +5651,6 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator return pOperator; } - static SSDataBlock* doTagScan(void* param, bool* newgroup) { SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->status == OP_EXEC_DONE) { @@ -5793,6 +5802,88 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf return pOperator; } +static SSDataBlock* hashDistinct(void* param, bool* newgroup) { + SOperatorInfo* pOperator = (SOperatorInfo*) param; + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SDistinctOperatorInfo* pInfo = pOperator->info; + SSDataBlock* pRes = pInfo->pRes; + + pRes->info.rows = 0; + SSDataBlock* pBlock = NULL; + while(1) { + pBlock = pOperator->upstream->exec(pOperator->upstream, newgroup); + if (pBlock == NULL) { + setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); + pOperator->status = OP_EXEC_DONE; + return NULL; + } + + assert(pBlock->info.numOfCols == 1); + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0); + + int16_t bytes = pColInfoData->info.bytes; + int16_t type = pColInfoData->info.type; + + // ensure the output buffer size + SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, 0); + if (pRes->info.rows + pBlock->info.rows > pInfo->outputCapacity) { + int32_t newSize = pRes->info.rows + pBlock->info.rows; + char* tmp = realloc(pResultColInfoData->pData, newSize * bytes); + if (tmp == NULL) { + return NULL; + } else { + pResultColInfoData->pData = tmp; + pInfo->outputCapacity = newSize; + } + } + + for(int32_t i = 0; i < pBlock->info.rows; ++i) { + char* val = ((char*)pColInfoData->pData) + bytes * i; + if (isNull(val, type)) { + continue; + } + + void* res = taosHashGet(pInfo->pSet, val, bytes); + if (res == NULL) { + taosHashPut(pInfo->pSet, val, bytes, NULL, 0); + char* start = pResultColInfoData->pData + bytes * pInfo->pRes->info.rows; + memcpy(start, val, bytes); + pRes->info.rows += 1; + } + } + + if (pRes->info.rows >= pInfo->threshold) { + break; + } + } + + return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL; +} + +SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { + SDistinctOperatorInfo* pInfo = calloc(1, sizeof(SDistinctOperatorInfo)); + + pInfo->outputCapacity = 4096; + pInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(pExpr->base.colType), false, HASH_NO_LOCK); + pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pInfo->outputCapacity); + + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + pOperator->name = "DistinctOperator"; + pOperator->blockingOptr = false; + pOperator->status = OP_IN_EXECUTING; + pOperator->operatorType = OP_Distinct; + pOperator->upstream = upstream; + pOperator->numOfOutput = numOfOutput; + pOperator->info = pInfo; + pOperator->pRuntimeEnv = pRuntimeEnv; + pOperator->exec = hashDistinct; + pOperator->cleanup = destroyDistinctOperatorInfo; + return pOperator; +} + static int32_t getColumnIndexInSource(SQueriedTableInfo *pTableInfo, SSqlExpr *pExpr, SColumnInfo* pTagCols) { int32_t j = 0; diff --git a/src/query/src/qPlan.c b/src/query/src/qPlan.c index d62789307e..0554a887ec 100644 --- a/src/query/src/qPlan.c +++ b/src/query/src/qPlan.c @@ -49,17 +49,20 @@ SArray* createTableScanPlan(SQueryAttr* pQueryAttr) { int32_t op = 0; if (onlyQueryTags(pQueryAttr)) { // op = OP_TagScan; - } else if (pQueryAttr->queryBlockDist) { - op = OP_TableBlockInfoScan; - } else if (pQueryAttr->tsCompQuery || pQueryAttr->pointInterpQuery) { - op = OP_TableSeqScan; - } else if (pQueryAttr->needReverseScan) { - op = OP_DataBlocksOptScan; } else { - op = OP_TableScan; + if (pQueryAttr->queryBlockDist) { + op = OP_TableBlockInfoScan; + } else if (pQueryAttr->tsCompQuery || pQueryAttr->pointInterpQuery) { + op = OP_TableSeqScan; + } else if (pQueryAttr->needReverseScan) { + op = OP_DataBlocksOptScan; + } else { + op = OP_TableScan; + } + + taosArrayPush(plan, &op); } - taosArrayPush(plan, &op); return plan; } @@ -70,6 +73,10 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) { if (onlyQueryTags(pQueryAttr)) { // do nothing for tags query op = OP_TagScan; taosArrayPush(plan, &op); + if (pQueryAttr->distinctTag) { + op = OP_Distinct; + taosArrayPush(plan, &op); + } } else if (pQueryAttr->interval.interval > 0) { if (pQueryAttr->stableQuery) { op = OP_MultiTableTimeInterval; @@ -148,10 +155,14 @@ SArray* createGlobalMergePlan(SQueryAttr* pQueryAttr) { return plan; } - // todo: - int32_t op = OP_MultiwaySort; + int32_t op = OP_MultiwayMergeSort; taosArrayPush(plan, &op); + if (pQueryAttr->distinctTag) { + op = OP_Distinct; + taosArrayPush(plan, &op); + } + if (pQueryAttr->simpleAgg || (pQueryAttr->interval.interval > 0 || pQueryAttr->sw.gap > 0)) { op = OP_GlobalAggregate; taosArrayPush(plan, &op); diff --git a/tests/script/general/parser/testSuite.sim b/tests/script/general/parser/testSuite.sim index 889f0043df..c6d85bb6ff 100644 --- a/tests/script/general/parser/testSuite.sim +++ b/tests/script/general/parser/testSuite.sim @@ -17,7 +17,7 @@ run general/parser/first_last.sim run general/parser/import_commit1.sim run general/parser/import_commit2.sim run general/parser/import_commit3.sim -#run general/parser/import_file.sim +run general/parser/import_file.sim run general/parser/insert_tb.sim run general/parser/tags_dynamically_specifiy.sim run general/parser/interp.sim @@ -43,6 +43,7 @@ run general/parser/join_multivnode.sim run general/parser/join_manyblocks.sim run general/parser/projection_limit_offset.sim run general/parser/select_with_tags.sim +run general/parser/select_distinct_tag.sim run general/parser/groupby.sim run general/parser/tags_filter.sim run general/parser/topbot.sim -- GitLab