提交 341a46e4 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/row_refact

......@@ -1087,7 +1087,7 @@ void* doAsyncFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUc
tsem_wait(&pParam->sem);
}
if (pRequest->code == TSDB_CODE_SUCCESS && setupOneRowPtr) {
if (pRequest->code == TSDB_CODE_SUCCESS && pResultInfo->numOfRows > 0 && setupOneRowPtr) {
doSetOneRowPtr(pResultInfo);
pResultInfo->current += 1;
}
......
......@@ -328,6 +328,18 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
CTG_ERR_JRET(ctgInitGetQnodeTask(pJob, taskIdx++));
}
pJob->refId = taosAddRef(gCtgMgmt.jobPool, pJob);
if (pJob->refId < 0) {
ctgError("add job to ref failed, error: %s", tstrerror(terrno));
CTG_ERR_JRET(terrno);
}
taosAcquireRef(gCtgMgmt.jobPool, pJob->refId);
qDebug("QID:%" PRIx64 ", job %" PRIx64 " initialized, task num %d", pJob->queryId, pJob->refId, *taskNum);
return TSDB_CODE_SUCCESS;
_return:
taosMemoryFreeClear(*job);
CTG_RET(code);
......
......@@ -226,6 +226,9 @@ static SColumnNode* createColumnByFunc(const SFunctionNode* pFunc) {
}
bool fmIsDistExecFunc(int32_t funcId) {
if (fmIsUserDefinedFunc(funcId)) {
return false;
}
if (!fmIsVectorFunc(funcId)) {
return true;
}
......
......@@ -199,6 +199,22 @@ static int32_t collectMetaKeyFromCreateMultiTable(SCollectMetaKeyCxt* pCxt, SCre
return code;
}
static int32_t collectMetaKeyFromDropTable(SCollectMetaKeyCxt* pCxt, SDropTableStmt* pStmt) {
int32_t code = TSDB_CODE_SUCCESS;
SNode* pNode = NULL;
FOREACH(pNode, pStmt->pTables) {
SDropTableClause* pClause = (SDropTableClause*)pNode;
code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, pCxt->pMetaCache);
if (TSDB_CODE_SUCCESS == code) {
code = reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, pCxt->pMetaCache);
}
if (TSDB_CODE_SUCCESS != code) {
break;
}
}
return code;
}
static int32_t collectMetaKeyFromAlterTable(SCollectMetaKeyCxt* pCxt, SAlterTableStmt* pStmt) {
int32_t code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache);
if (TSDB_CODE_SUCCESS == code) {
......@@ -341,6 +357,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
return collectMetaKeyFromCreateTable(pCxt, (SCreateTableStmt*)pStmt);
case QUERY_NODE_CREATE_MULTI_TABLE_STMT:
return collectMetaKeyFromCreateMultiTable(pCxt, (SCreateMultiTableStmt*)pStmt);
case QUERY_NODE_DROP_TABLE_STMT:
return collectMetaKeyFromDropTable(pCxt, (SDropTableStmt*)pStmt);
case QUERY_NODE_ALTER_TABLE_STMT:
return collectMetaKeyFromAlterTable(pCxt, (SAlterTableStmt*)pStmt);
case QUERY_NODE_USE_DATABASE_STMT:
......
......@@ -24,7 +24,7 @@ class ParserInitialDTest : public ParserDdlTest {};
// todo delete
// todo desc
// todo describe
// todo drop account
// todo DROP account
TEST_F(ParserInitialDTest, dropBnode) {
useDb("root", "test");
......@@ -62,51 +62,61 @@ TEST_F(ParserInitialDTest, dropCGroup) {
run("DROP CONSUMER GROUP IF EXISTS cg1 ON tp1");
}
// todo drop database
// todo drop dnode
// todo drop function
// todo DROP database
// todo DROP dnode
// todo DROP function
TEST_F(ParserInitialDTest, dropIndex) {
useDb("root", "test");
run("drop index index1 on t1");
run("DROP index index1 on t1");
}
TEST_F(ParserInitialDTest, dropMnode) {
useDb("root", "test");
run("drop mnode on dnode 1");
run("DROP mnode on dnode 1");
}
TEST_F(ParserInitialDTest, dropQnode) {
useDb("root", "test");
run("drop qnode on dnode 1");
run("DROP qnode on dnode 1");
}
TEST_F(ParserInitialDTest, dropSnode) {
useDb("root", "test");
run("drop snode on dnode 1");
run("DROP snode on dnode 1");
}
// todo drop stable
// todo drop stream
// todo drop table
TEST_F(ParserInitialDTest, dropSTable) {
useDb("root", "test");
run("DROP STABLE st1");
}
// todo DROP stream
TEST_F(ParserInitialDTest, dropTable) {
useDb("root", "test");
run("DROP TABLE t1");
}
TEST_F(ParserInitialDTest, dropTopic) {
useDb("root", "test");
run("drop topic tp1");
run("DROP topic tp1");
run("drop topic if exists tp1");
run("DROP topic if exists tp1");
}
TEST_F(ParserInitialDTest, dropUser) {
login("root");
useDb("root", "test");
run("drop user wxy");
run("DROP user wxy");
}
} // namespace ParserTest
......@@ -151,8 +151,8 @@ static bool stbSplHasMultiTbScan(bool streamQuery, SLogicNode* pNode) {
static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
switch (nodeType(pNode)) {
// case QUERY_NODE_LOGIC_PLAN_AGG:
// return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(pNode);
case QUERY_NODE_LOGIC_PLAN_AGG:
return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
case QUERY_NODE_LOGIC_PLAN_WINDOW: {
SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode;
if (WINDOW_TYPE_INTERVAL != pWindow->winType) {
......@@ -161,7 +161,7 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
}
// case QUERY_NODE_LOGIC_PLAN_SORT:
// return stbSplHasMultiTbScan(pNode);
// return stbSplHasMultiTbScan(streamQuery, pNode);
case QUERY_NODE_LOGIC_PLAN_SCAN:
return stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode);
default:
......@@ -295,7 +295,8 @@ static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogic
return code;
}
static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicNode* pParent, SLogicNode* pPartChild) {
static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicNode* pParent, SNodeList* pMergeKeys,
SLogicNode* pPartChild) {
SMergeLogicNode* pMerge = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE);
if (NULL == pMerge) {
return TSDB_CODE_OUT_OF_MEMORY;
......@@ -304,25 +305,28 @@ static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicNode* pParent, S
pMerge->srcGroupId = pCxt->groupId;
pMerge->node.pParent = pParent;
pMerge->node.precision = pPartChild->precision;
int32_t code = nodesListMakeStrictAppend(&pMerge->pMergeKeys, nodesCloneNode(((SWindowLogicNode*)pParent)->pTspk));
if (TSDB_CODE_SUCCESS == code) {
pMerge->node.pTargets = nodesCloneList(pPartChild->pTargets);
if (NULL == pMerge->node.pTargets) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeAppend(&pParent->pChildren, pMerge);
pMerge->pMergeKeys = pMergeKeys;
pMerge->node.pTargets = nodesCloneList(pPartChild->pTargets);
if (NULL == pMerge->node.pTargets) {
nodesDestroyNode(pMerge);
return TSDB_CODE_OUT_OF_MEMORY;
}
return code;
return nodesListMakeAppend(&pParent->pChildren, pMerge);
}
static int32_t stbSplSplitWindowNodeForBatch(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
SLogicNode* pPartWindow = NULL;
int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateMergeNode(pCxt, pInfo->pSplitNode, pPartWindow);
SNodeList* pMergeKeys = NULL;
code = nodesListMakeStrictAppend(&pMergeKeys, nodesCloneNode(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk));
if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateMergeNode(pCxt, pInfo->pSplitNode, pMergeKeys, pPartWindow);
}
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyList(pMergeKeys);
}
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
......@@ -365,6 +369,124 @@ static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInf
}
}
static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pOutput) {
SNodeList* pFunc = pMergeAgg->pAggFuncs;
pMergeAgg->pAggFuncs = NULL;
SNodeList* pGroupKeys = pMergeAgg->pGroupKeys;
pMergeAgg->pGroupKeys = NULL;
SNodeList* pTargets = pMergeAgg->node.pTargets;
pMergeAgg->node.pTargets = NULL;
SNodeList* pChildren = pMergeAgg->node.pChildren;
pMergeAgg->node.pChildren = NULL;
int32_t code = TSDB_CODE_SUCCESS;
SAggLogicNode* pPartAgg = nodesCloneNode(pMergeAgg);
if (NULL == pPartAgg) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) {
pPartAgg->pGroupKeys = pGroupKeys;
code = createColumnByRewriteExps(pPartAgg->pGroupKeys, &pPartAgg->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) {
pMergeAgg->pGroupKeys = nodesCloneList(pPartAgg->node.pTargets);
if (NULL == pMergeAgg->pGroupKeys) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (TSDB_CODE_SUCCESS == code) {
pMergeAgg->node.pTargets = pTargets;
pPartAgg->node.pChildren = pChildren;
code = stbSplRewriteFuns(pFunc, &pPartAgg->pAggFuncs, &pMergeAgg->pAggFuncs);
}
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExps(pPartAgg->pAggFuncs, &pPartAgg->node.pTargets);
}
nodesDestroyList(pFunc);
if (TSDB_CODE_SUCCESS == code) {
*pOutput = (SLogicNode*)pPartAgg;
} else {
nodesDestroyNode(pPartAgg);
}
return code;
}
static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
SLogicNode* pPartAgg = NULL;
int32_t code = stbSplCreatePartAggNode((SAggLogicNode*)pInfo->pSplitNode, &pPartAgg);
if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartAgg);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
splCreateScanSubplan(pCxt, pPartAgg, SPLIT_FLAG_STABLE_SPLIT));
}
pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
return code;
}
static int32_t stbSplCreatePartSortNode(SSortLogicNode* pMergeSort, SLogicNode** pOutput) {
SNodeList* pSortKeys = pMergeSort->pSortKeys;
pMergeSort->pSortKeys = NULL;
SNodeList* pTargets = pMergeSort->node.pTargets;
pMergeSort->node.pTargets = NULL;
SNodeList* pChildren = pMergeSort->node.pChildren;
pMergeSort->node.pChildren = NULL;
int32_t code = TSDB_CODE_SUCCESS;
SSortLogicNode* pPartSort = nodesCloneNode(pMergeSort);
if (NULL == pPartSort) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
pMergeSort->node.pTargets = pTargets;
pPartSort->node.pChildren = pChildren;
if (TSDB_CODE_SUCCESS == code) {
pPartSort->pSortKeys = pSortKeys;
code = createColumnByRewriteExps(pPartSort->pSortKeys, &pPartSort->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
pMergeSort->pSortKeys = nodesCloneList(pPartSort->node.pTargets);
if (NULL == pMergeSort->pSortKeys) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (TSDB_CODE_SUCCESS == code) {
*pOutput = (SLogicNode*)pPartSort;
} else {
nodesDestroyNode(pPartSort);
}
return code;
}
static int32_t stbSplSplitSortNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
SLogicNode* pPartSort = NULL;
int32_t code = stbSplCreatePartSortNode((SSortLogicNode*)pInfo->pSplitNode, &pPartSort);
if (TSDB_CODE_SUCCESS == code) {
SNodeList* pMergeKeys = nodesCloneList(((SSortLogicNode*)pInfo->pSplitNode)->pSortKeys);
if (NULL != pMergeKeys) {
code = stbSplCreateMergeNode(pCxt, pInfo->pSplitNode, pMergeKeys, pPartSort);
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyList(pMergeKeys);
}
} else {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
splCreateScanSubplan(pCxt, pPartSort, SPLIT_FLAG_STABLE_SPLIT));
}
pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
return code;
}
static int32_t stbSplSplitScanNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pInfo->pSplitNode, SUBPLAN_TYPE_MERGE);
if (TSDB_CODE_SUCCESS == code) {
......@@ -386,9 +508,15 @@ static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
int32_t code = TSDB_CODE_SUCCESS;
switch (nodeType(info.pSplitNode)) {
case QUERY_NODE_LOGIC_PLAN_AGG:
code = stbSplSplitAggNode(pCxt, &info);
break;
case QUERY_NODE_LOGIC_PLAN_WINDOW:
code = stbSplSplitWindowNode(pCxt, &info);
break;
case QUERY_NODE_LOGIC_PLAN_SORT:
code = stbSplSplitSortNode(pCxt, &info);
break;
case QUERY_NODE_LOGIC_PLAN_SCAN:
code = stbSplSplitScanNode(pCxt, &info);
break;
......
......@@ -67,3 +67,11 @@ TEST_F(PlanGroupByTest, selectFunc) {
run("SELECT MAX(c1), c2 FROM t1 GROUP BY c3");
run("SELECT MAX(c1), t1.* FROM t1 GROUP BY c3");
}
TEST_F(PlanGroupByTest, stable) {
useDb("root", "test");
run("SELECT COUNT(*) FROM st1");
run("SELECT COUNT(*) FROM st1 GROUP BY c1");
}
......@@ -23,20 +23,27 @@ class PlanOrderByTest : public PlannerTestBase {};
TEST_F(PlanOrderByTest, basic) {
useDb("root", "test");
// order by key is in the projection list
run("select c1 from t1 order by c1");
// order by key is not in the projection list
run("select c1 from t1 order by c2");
// ORDER BY key is in the projection list
run("SELECT c1 FROM t1 ORDER BY c1");
// ORDER BY key is not in the projection list
run("SELECT c1 FROM t1 ORDER BY c2");
}
TEST_F(PlanOrderByTest, expr) {
useDb("root", "test");
run("select * from t1 order by c1 + 10, c2");
run("SELECT * FROM t1 ORDER BY c1 + 10, c2");
}
TEST_F(PlanOrderByTest, nullsOrder) {
useDb("root", "test");
run("select * from t1 order by c1 desc nulls first");
run("SELECT * FROM t1 ORDER BY c1 DESC NULLS FIRST");
}
TEST_F(PlanOrderByTest, stable) {
useDb("root", "test");
// ORDER BY key is in the projection list
run("SELECT c1 FROM st1 ORDER BY c1");
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册