diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 774ef5f2483573376121b7cf910f1f7eb1ff689b..4c68edaff0aaaa93824b5010e6fbfca192b26a44 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1087,7 +1087,7 @@ void* doAsyncFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUc tsem_wait(&pParam->sem); } - if (pRequest->code == TSDB_CODE_SUCCESS && setupOneRowPtr) { + if (pRequest->code == TSDB_CODE_SUCCESS && pResultInfo->numOfRows > 0 && setupOneRowPtr) { doSetOneRowPtr(pResultInfo); pResultInfo->current += 1; } diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 312f0c9250bf405b5ec67f18c1d34b51fc244aff..a9e808e7490a652da5902c94fcdd24a2abbe5bc1 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -328,6 +328,18 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq* CTG_ERR_JRET(ctgInitGetQnodeTask(pJob, taskIdx++)); } + pJob->refId = taosAddRef(gCtgMgmt.jobPool, pJob); + if (pJob->refId < 0) { + ctgError("add job to ref failed, error: %s", tstrerror(terrno)); + CTG_ERR_JRET(terrno); + } + + taosAcquireRef(gCtgMgmt.jobPool, pJob->refId); + + qDebug("QID:%" PRIx64 ", job %" PRIx64 " initialized, task num %d", pJob->queryId, pJob->refId, *taskNum); + return TSDB_CODE_SUCCESS; + + _return: taosMemoryFreeClear(*job); CTG_RET(code); diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index 611ae8d81fdc681c28936456b5b46c0a7e09d4c0..67efdc5c0b91266500e3146b3c2d76350f28087d 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -226,6 +226,9 @@ static SColumnNode* createColumnByFunc(const SFunctionNode* pFunc) { } bool fmIsDistExecFunc(int32_t funcId) { + if (fmIsUserDefinedFunc(funcId)) { + return false; + } if (!fmIsVectorFunc(funcId)) { return true; } diff --git a/source/libs/parser/src/parAstParser.c b/source/libs/parser/src/parAstParser.c index 18c1341da86bb2306cc6cb2e5b1036e9bbba7a8e..7abe2442610bbe85e823fb3fb85843351f8e52e8 100644 --- a/source/libs/parser/src/parAstParser.c +++ b/source/libs/parser/src/parAstParser.c @@ -199,6 +199,22 @@ static int32_t collectMetaKeyFromCreateMultiTable(SCollectMetaKeyCxt* pCxt, SCre return code; } +static int32_t collectMetaKeyFromDropTable(SCollectMetaKeyCxt* pCxt, SDropTableStmt* pStmt) { + int32_t code = TSDB_CODE_SUCCESS; + SNode* pNode = NULL; + FOREACH(pNode, pStmt->pTables) { + SDropTableClause* pClause = (SDropTableClause*)pNode; + code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, pCxt->pMetaCache); + if (TSDB_CODE_SUCCESS == code) { + code = reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, pCxt->pMetaCache); + } + if (TSDB_CODE_SUCCESS != code) { + break; + } + } + return code; +} + static int32_t collectMetaKeyFromAlterTable(SCollectMetaKeyCxt* pCxt, SAlterTableStmt* pStmt) { int32_t code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache); if (TSDB_CODE_SUCCESS == code) { @@ -341,6 +357,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) { return collectMetaKeyFromCreateTable(pCxt, (SCreateTableStmt*)pStmt); case QUERY_NODE_CREATE_MULTI_TABLE_STMT: return collectMetaKeyFromCreateMultiTable(pCxt, (SCreateMultiTableStmt*)pStmt); + case QUERY_NODE_DROP_TABLE_STMT: + return collectMetaKeyFromDropTable(pCxt, (SDropTableStmt*)pStmt); case QUERY_NODE_ALTER_TABLE_STMT: return collectMetaKeyFromAlterTable(pCxt, (SAlterTableStmt*)pStmt); case QUERY_NODE_USE_DATABASE_STMT: diff --git a/source/libs/parser/test/parInitialDTest.cpp b/source/libs/parser/test/parInitialDTest.cpp index 5ad427d964ad1dc47a4fed64b51f89257ae53da6..1b1b930851880c86282584df57cb32ca4bef9f95 100644 --- a/source/libs/parser/test/parInitialDTest.cpp +++ b/source/libs/parser/test/parInitialDTest.cpp @@ -24,7 +24,7 @@ class ParserInitialDTest : public ParserDdlTest {}; // todo delete // todo desc // todo describe -// todo drop account +// todo DROP account TEST_F(ParserInitialDTest, dropBnode) { useDb("root", "test"); @@ -62,51 +62,61 @@ TEST_F(ParserInitialDTest, dropCGroup) { run("DROP CONSUMER GROUP IF EXISTS cg1 ON tp1"); } -// todo drop database -// todo drop dnode -// todo drop function +// todo DROP database +// todo DROP dnode +// todo DROP function TEST_F(ParserInitialDTest, dropIndex) { useDb("root", "test"); - run("drop index index1 on t1"); + run("DROP index index1 on t1"); } TEST_F(ParserInitialDTest, dropMnode) { useDb("root", "test"); - run("drop mnode on dnode 1"); + run("DROP mnode on dnode 1"); } TEST_F(ParserInitialDTest, dropQnode) { useDb("root", "test"); - run("drop qnode on dnode 1"); + run("DROP qnode on dnode 1"); } TEST_F(ParserInitialDTest, dropSnode) { useDb("root", "test"); - run("drop snode on dnode 1"); + run("DROP snode on dnode 1"); } -// todo drop stable -// todo drop stream -// todo drop table +TEST_F(ParserInitialDTest, dropSTable) { + useDb("root", "test"); + + run("DROP STABLE st1"); +} + +// todo DROP stream + +TEST_F(ParserInitialDTest, dropTable) { + useDb("root", "test"); + + run("DROP TABLE t1"); +} TEST_F(ParserInitialDTest, dropTopic) { useDb("root", "test"); - run("drop topic tp1"); + run("DROP topic tp1"); - run("drop topic if exists tp1"); + run("DROP topic if exists tp1"); } TEST_F(ParserInitialDTest, dropUser) { login("root"); useDb("root", "test"); - run("drop user wxy"); + run("DROP user wxy"); } } // namespace ParserTest diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index e3c8b82e3988bd7943815a645332920f9d31d08b..a3d508690880c1d229c65137014f87d8efa733a9 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -151,8 +151,8 @@ static bool stbSplHasMultiTbScan(bool streamQuery, SLogicNode* pNode) { static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) { switch (nodeType(pNode)) { - // case QUERY_NODE_LOGIC_PLAN_AGG: - // return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(pNode); + case QUERY_NODE_LOGIC_PLAN_AGG: + return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode); case QUERY_NODE_LOGIC_PLAN_WINDOW: { SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode; if (WINDOW_TYPE_INTERVAL != pWindow->winType) { @@ -161,7 +161,7 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) { return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode); } // case QUERY_NODE_LOGIC_PLAN_SORT: - // return stbSplHasMultiTbScan(pNode); + // return stbSplHasMultiTbScan(streamQuery, pNode); case QUERY_NODE_LOGIC_PLAN_SCAN: return stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode); default: @@ -295,7 +295,8 @@ static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogic return code; } -static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicNode* pParent, SLogicNode* pPartChild) { +static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicNode* pParent, SNodeList* pMergeKeys, + SLogicNode* pPartChild) { SMergeLogicNode* pMerge = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE); if (NULL == pMerge) { return TSDB_CODE_OUT_OF_MEMORY; @@ -304,25 +305,28 @@ static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicNode* pParent, S pMerge->srcGroupId = pCxt->groupId; pMerge->node.pParent = pParent; pMerge->node.precision = pPartChild->precision; - int32_t code = nodesListMakeStrictAppend(&pMerge->pMergeKeys, nodesCloneNode(((SWindowLogicNode*)pParent)->pTspk)); - if (TSDB_CODE_SUCCESS == code) { - pMerge->node.pTargets = nodesCloneList(pPartChild->pTargets); - if (NULL == pMerge->node.pTargets) { - code = TSDB_CODE_OUT_OF_MEMORY; - } - } - if (TSDB_CODE_SUCCESS == code) { - code = nodesListMakeAppend(&pParent->pChildren, pMerge); + pMerge->pMergeKeys = pMergeKeys; + pMerge->node.pTargets = nodesCloneList(pPartChild->pTargets); + if (NULL == pMerge->node.pTargets) { + nodesDestroyNode(pMerge); + return TSDB_CODE_OUT_OF_MEMORY; } - return code; + return nodesListMakeAppend(&pParent->pChildren, pMerge); } static int32_t stbSplSplitWindowNodeForBatch(SSplitContext* pCxt, SStableSplitInfo* pInfo) { SLogicNode* pPartWindow = NULL; int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow); if (TSDB_CODE_SUCCESS == code) { - code = stbSplCreateMergeNode(pCxt, pInfo->pSplitNode, pPartWindow); + SNodeList* pMergeKeys = NULL; + code = nodesListMakeStrictAppend(&pMergeKeys, nodesCloneNode(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk)); + if (TSDB_CODE_SUCCESS == code) { + code = stbSplCreateMergeNode(pCxt, pInfo->pSplitNode, pMergeKeys, pPartWindow); + } + if (TSDB_CODE_SUCCESS != code) { + nodesDestroyList(pMergeKeys); + } } if (TSDB_CODE_SUCCESS == code) { code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, @@ -365,6 +369,124 @@ static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInf } } +static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pOutput) { + SNodeList* pFunc = pMergeAgg->pAggFuncs; + pMergeAgg->pAggFuncs = NULL; + SNodeList* pGroupKeys = pMergeAgg->pGroupKeys; + pMergeAgg->pGroupKeys = NULL; + SNodeList* pTargets = pMergeAgg->node.pTargets; + pMergeAgg->node.pTargets = NULL; + SNodeList* pChildren = pMergeAgg->node.pChildren; + pMergeAgg->node.pChildren = NULL; + + int32_t code = TSDB_CODE_SUCCESS; + SAggLogicNode* pPartAgg = nodesCloneNode(pMergeAgg); + if (NULL == pPartAgg) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + + if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) { + pPartAgg->pGroupKeys = pGroupKeys; + code = createColumnByRewriteExps(pPartAgg->pGroupKeys, &pPartAgg->node.pTargets); + } + if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) { + pMergeAgg->pGroupKeys = nodesCloneList(pPartAgg->node.pTargets); + if (NULL == pMergeAgg->pGroupKeys) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + if (TSDB_CODE_SUCCESS == code) { + pMergeAgg->node.pTargets = pTargets; + pPartAgg->node.pChildren = pChildren; + + code = stbSplRewriteFuns(pFunc, &pPartAgg->pAggFuncs, &pMergeAgg->pAggFuncs); + } + if (TSDB_CODE_SUCCESS == code) { + code = createColumnByRewriteExps(pPartAgg->pAggFuncs, &pPartAgg->node.pTargets); + } + + nodesDestroyList(pFunc); + if (TSDB_CODE_SUCCESS == code) { + *pOutput = (SLogicNode*)pPartAgg; + } else { + nodesDestroyNode(pPartAgg); + } + + return code; +} + +static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { + SLogicNode* pPartAgg = NULL; + int32_t code = stbSplCreatePartAggNode((SAggLogicNode*)pInfo->pSplitNode, &pPartAgg); + if (TSDB_CODE_SUCCESS == code) { + code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartAgg); + } + if (TSDB_CODE_SUCCESS == code) { + code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, + splCreateScanSubplan(pCxt, pPartAgg, SPLIT_FLAG_STABLE_SPLIT)); + } + pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE; + return code; +} + +static int32_t stbSplCreatePartSortNode(SSortLogicNode* pMergeSort, SLogicNode** pOutput) { + SNodeList* pSortKeys = pMergeSort->pSortKeys; + pMergeSort->pSortKeys = NULL; + SNodeList* pTargets = pMergeSort->node.pTargets; + pMergeSort->node.pTargets = NULL; + SNodeList* pChildren = pMergeSort->node.pChildren; + pMergeSort->node.pChildren = NULL; + + int32_t code = TSDB_CODE_SUCCESS; + SSortLogicNode* pPartSort = nodesCloneNode(pMergeSort); + if (NULL == pPartSort) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + + pMergeSort->node.pTargets = pTargets; + pPartSort->node.pChildren = pChildren; + if (TSDB_CODE_SUCCESS == code) { + pPartSort->pSortKeys = pSortKeys; + code = createColumnByRewriteExps(pPartSort->pSortKeys, &pPartSort->node.pTargets); + } + if (TSDB_CODE_SUCCESS == code) { + pMergeSort->pSortKeys = nodesCloneList(pPartSort->node.pTargets); + if (NULL == pMergeSort->pSortKeys) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + + if (TSDB_CODE_SUCCESS == code) { + *pOutput = (SLogicNode*)pPartSort; + } else { + nodesDestroyNode(pPartSort); + } + + return code; +} + +static int32_t stbSplSplitSortNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { + SLogicNode* pPartSort = NULL; + int32_t code = stbSplCreatePartSortNode((SSortLogicNode*)pInfo->pSplitNode, &pPartSort); + if (TSDB_CODE_SUCCESS == code) { + SNodeList* pMergeKeys = nodesCloneList(((SSortLogicNode*)pInfo->pSplitNode)->pSortKeys); + if (NULL != pMergeKeys) { + code = stbSplCreateMergeNode(pCxt, pInfo->pSplitNode, pMergeKeys, pPartSort); + if (TSDB_CODE_SUCCESS != code) { + nodesDestroyList(pMergeKeys); + } + } else { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + if (TSDB_CODE_SUCCESS == code) { + code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, + splCreateScanSubplan(pCxt, pPartSort, SPLIT_FLAG_STABLE_SPLIT)); + } + pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE; + return code; +} + static int32_t stbSplSplitScanNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pInfo->pSplitNode, SUBPLAN_TYPE_MERGE); if (TSDB_CODE_SUCCESS == code) { @@ -386,9 +508,15 @@ static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { int32_t code = TSDB_CODE_SUCCESS; switch (nodeType(info.pSplitNode)) { + case QUERY_NODE_LOGIC_PLAN_AGG: + code = stbSplSplitAggNode(pCxt, &info); + break; case QUERY_NODE_LOGIC_PLAN_WINDOW: code = stbSplSplitWindowNode(pCxt, &info); break; + case QUERY_NODE_LOGIC_PLAN_SORT: + code = stbSplSplitSortNode(pCxt, &info); + break; case QUERY_NODE_LOGIC_PLAN_SCAN: code = stbSplSplitScanNode(pCxt, &info); break; diff --git a/source/libs/planner/test/planGroupByTest.cpp b/source/libs/planner/test/planGroupByTest.cpp index cf516034707e53a230d4e2e7af6fb81c3b8aaecf..201df2efde6236c5ae68aaedf04625a2c4acda19 100644 --- a/source/libs/planner/test/planGroupByTest.cpp +++ b/source/libs/planner/test/planGroupByTest.cpp @@ -67,3 +67,11 @@ TEST_F(PlanGroupByTest, selectFunc) { run("SELECT MAX(c1), c2 FROM t1 GROUP BY c3"); run("SELECT MAX(c1), t1.* FROM t1 GROUP BY c3"); } + +TEST_F(PlanGroupByTest, stable) { + useDb("root", "test"); + + run("SELECT COUNT(*) FROM st1"); + + run("SELECT COUNT(*) FROM st1 GROUP BY c1"); +} diff --git a/source/libs/planner/test/planOrderByTest.cpp b/source/libs/planner/test/planOrderByTest.cpp index d95d1bdf1d429d9ea6c5e62c5a280dbb5c6de477..ef6f438b55c60f89e96de40c9eeeff71f3f2fcdf 100644 --- a/source/libs/planner/test/planOrderByTest.cpp +++ b/source/libs/planner/test/planOrderByTest.cpp @@ -23,20 +23,27 @@ class PlanOrderByTest : public PlannerTestBase {}; TEST_F(PlanOrderByTest, basic) { useDb("root", "test"); - // order by key is in the projection list - run("select c1 from t1 order by c1"); - // order by key is not in the projection list - run("select c1 from t1 order by c2"); + // ORDER BY key is in the projection list + run("SELECT c1 FROM t1 ORDER BY c1"); + // ORDER BY key is not in the projection list + run("SELECT c1 FROM t1 ORDER BY c2"); } TEST_F(PlanOrderByTest, expr) { useDb("root", "test"); - run("select * from t1 order by c1 + 10, c2"); + run("SELECT * FROM t1 ORDER BY c1 + 10, c2"); } TEST_F(PlanOrderByTest, nullsOrder) { useDb("root", "test"); - run("select * from t1 order by c1 desc nulls first"); + run("SELECT * FROM t1 ORDER BY c1 DESC NULLS FIRST"); +} + +TEST_F(PlanOrderByTest, stable) { + useDb("root", "test"); + + // ORDER BY key is in the projection list + run("SELECT c1 FROM st1 ORDER BY c1"); }