提交 b445afb5 编写于 作者: X Xiaoyu Wang

feat: stable interval split

上级 0b99a1b3
......@@ -188,8 +188,8 @@ int32_t __catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* ve
}
int32_t __catalogGetDBVgInfo(SCatalog* pCtg, void* pRpc, const SEpSet* pMgmtEps, const char* dbFName,
SArray** vgroupList) {
return 0;
SArray** pVgList) {
return g_mockCatalogService->catalogGetDBVgInfo(dbFName, pVgList);
}
int32_t __catalogGetDBCfg(SCatalog* pCtg, void* pRpc, const SEpSet* pMgmtEps, const char* dbFName, SDbCfgInfo* pDbCfg) {
......
......@@ -18,6 +18,7 @@
#include <iomanip>
#include <iostream>
#include <map>
#include <set>
#include "tdatablock.h"
#include "tname.h"
......@@ -120,6 +121,25 @@ class MockCatalogServiceImpl {
return copyTableVgroup(db, tNameGetTableName(pTableName), vgList);
}
int32_t catalogGetDBVgInfo(const char* pDbFName, SArray** pVgList) const {
std::string dbFName(pDbFName);
DbMetaCache::const_iterator it = meta_.find(dbFName.substr(std::string(pDbFName).find_last_of('.') + 1));
if (meta_.end() == it) {
return TSDB_CODE_FAILED;
}
std::set<int32_t> vgSet;
*pVgList = taosArrayInit(it->second.size(), sizeof(SVgroupInfo));
for (const auto& vgs : it->second) {
for (const auto& vg : vgs.second->vgs) {
if (0 == vgSet.count(vg.vgId)) {
taosArrayPush(*pVgList, &vg);
vgSet.insert(vg.vgId);
}
}
}
return TSDB_CODE_SUCCESS;
}
int32_t catalogGetUdfInfo(const std::string& funcName, SFuncInfo* pInfo) const {
auto it = udf_.find(funcName);
if (udf_.end() == it) {
......@@ -187,8 +207,9 @@ class MockCatalogServiceImpl {
// number of backward fills
#define NOB(n) ((n) % 2 ? (n) / 2 + 1 : (n) / 2)
// center aligned
#define CA(n, s) std::setw(NOF((n) - int((s).length()))) << "" << (s) \
<< std::setw(NOB((n) - int((s).length()))) << "" << "|"
#define CA(n, s) \
std::setw(NOF((n) - int((s).length()))) << "" << (s) << std::setw(NOB((n) - int((s).length()))) << "" \
<< "|"
// string field length
#define SFL 20
// string field header
......@@ -490,6 +511,10 @@ int32_t MockCatalogService::catalogGetTableDistVgInfo(const SName* pTableName, S
return impl_->catalogGetTableDistVgInfo(pTableName, pVgList);
}
int32_t MockCatalogService::catalogGetDBVgInfo(const char* pDbFName, SArray** pVgList) const {
return impl_->catalogGetDBVgInfo(pDbFName, pVgList);
}
int32_t MockCatalogService::catalogGetUdfInfo(const std::string& funcName, SFuncInfo* pInfo) const {
return impl_->catalogGetUdfInfo(funcName, pInfo);
}
......
......@@ -61,6 +61,7 @@ class MockCatalogService {
int32_t catalogGetTableMeta(const SName* pTableName, STableMeta** pTableMeta) const;
int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const;
int32_t catalogGetTableDistVgInfo(const SName* pTableName, SArray** pVgList) const;
int32_t catalogGetDBVgInfo(const char* pDbFName, SArray** pVgList) const;
int32_t catalogGetUdfInfo(const std::string& funcName, SFuncInfo* pInfo) const;
int32_t catalogGetAllMeta(const SCatalogReq* pCatalogReq, SMetaData* pMetaData) const;
......
......@@ -17,7 +17,7 @@
#define SPLIT_FLAG_MASK(n) (1 << n)
#define SPLIT_FLAG_STS SPLIT_FLAG_MASK(0)
#define SPLIT_FLAG_STABLE_SPLIT SPLIT_FLAG_MASK(0)
#define SPLIT_FLAG_SET_MASK(val, mask) (val) |= (mask)
#define SPLIT_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0)
......@@ -35,27 +35,6 @@ typedef struct SSplitRule {
FSplit splitFunc;
} SSplitRule;
typedef struct SStsInfo {
SScanLogicNode* pScan;
SLogicSubplan* pSubplan;
} SStsInfo;
typedef struct SCtjInfo {
SJoinLogicNode* pJoin;
SLogicNode* pSplitNode;
SLogicSubplan* pSubplan;
} SCtjInfo;
typedef struct SUaInfo {
SProjectLogicNode* pProject;
SLogicSubplan* pSubplan;
} SUaInfo;
typedef struct SUnInfo {
SAggLogicNode* pAgg;
SLogicSubplan* pSubplan;
} SUnInfo;
typedef bool (*FSplFindSplitNode)(SLogicSubplan* pSubplan, void* pInfo);
static SLogicSubplan* splCreateSubplan(SSplitContext* pCxt, SLogicNode* pNode, int32_t flag) {
......@@ -121,14 +100,19 @@ static bool splMatch(SSplitContext* pCxt, SLogicSubplan* pSubplan, int32_t flag,
return false;
}
static SLogicNode* stsMatchByNode(SLogicNode* pNode) {
typedef struct SStableSplitInfo {
SScanLogicNode* pScan;
SLogicSubplan* pSubplan;
} SStableSplitInfo;
static SLogicNode* stbSplMatchByNode(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) && NULL != ((SScanLogicNode*)pNode)->pVgroupList &&
((SScanLogicNode*)pNode)->pVgroupList->numOfVgroups > 1) {
return pNode;
}
SNode* pChild;
FOREACH(pChild, pNode->pChildren) {
SLogicNode* pSplitNode = stsMatchByNode((SLogicNode*)pChild);
SLogicNode* pSplitNode = stbSplMatchByNode((SLogicNode*)pChild);
if (NULL != pSplitNode) {
return pSplitNode;
}
......@@ -136,8 +120,8 @@ static SLogicNode* stsMatchByNode(SLogicNode* pNode) {
return NULL;
}
static bool stsFindSplitNode(SLogicSubplan* pSubplan, SStsInfo* pInfo) {
SLogicNode* pSplitNode = stsMatchByNode(pSubplan->pNode);
static bool stbSplFindSplitNode(SLogicSubplan* pSubplan, SStableSplitInfo* pInfo) {
SLogicNode* pSplitNode = stbSplMatchByNode(pSubplan->pNode);
if (NULL != pSplitNode) {
pInfo->pScan = (SScanLogicNode*)pSplitNode;
pInfo->pSubplan = pSubplan;
......@@ -145,13 +129,13 @@ static bool stsFindSplitNode(SLogicSubplan* pSubplan, SStsInfo* pInfo) {
return NULL != pSplitNode;
}
static int32_t stsSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
SStsInfo info = {0};
if (!splMatch(pCxt, pSubplan, SPLIT_FLAG_STS, (FSplFindSplitNode)stsFindSplitNode, &info)) {
static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
SStableSplitInfo info = {0};
if (!splMatch(pCxt, pSubplan, SPLIT_FLAG_STABLE_SPLIT, (FSplFindSplitNode)stbSplFindSplitNode, &info)) {
return TSDB_CODE_SUCCESS;
}
int32_t code = nodesListMakeStrictAppend(&info.pSubplan->pChildren,
splCreateSubplan(pCxt, (SLogicNode*)info.pScan, SPLIT_FLAG_STS));
splCreateSubplan(pCxt, (SLogicNode*)info.pScan, SPLIT_FLAG_STABLE_SPLIT));
if (TSDB_CODE_SUCCESS == code) {
code = splCreateExchangeNode(pCxt, info.pSubplan, (SLogicNode*)info.pScan, SUBPLAN_TYPE_MERGE);
}
......@@ -160,7 +144,13 @@ static int32_t stsSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
return code;
}
static bool needSplit(SJoinLogicNode* pJoin) {
typedef struct SSigTbJoinSplitInfo {
SJoinLogicNode* pJoin;
SLogicNode* pSplitNode;
SLogicSubplan* pSubplan;
} SSigTbJoinSplitInfo;
static bool sigTbJoinSplNeedSplit(SJoinLogicNode* pJoin) {
if (!pJoin->isSingleTableJoin) {
return false;
}
......@@ -168,13 +158,13 @@ static bool needSplit(SJoinLogicNode* pJoin) {
QUERY_NODE_LOGIC_PLAN_EXCHANGE != nodeType(nodesListGetNode(pJoin->node.pChildren, 1));
}
static SJoinLogicNode* ctjMatchByNode(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pNode) && needSplit((SJoinLogicNode*)pNode)) {
static SJoinLogicNode* sigTbJoinSplMatchByNode(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pNode) && sigTbJoinSplNeedSplit((SJoinLogicNode*)pNode)) {
return (SJoinLogicNode*)pNode;
}
SNode* pChild;
FOREACH(pChild, pNode->pChildren) {
SJoinLogicNode* pSplitNode = ctjMatchByNode((SLogicNode*)pChild);
SJoinLogicNode* pSplitNode = sigTbJoinSplMatchByNode((SLogicNode*)pChild);
if (NULL != pSplitNode) {
return pSplitNode;
}
......@@ -182,8 +172,8 @@ static SJoinLogicNode* ctjMatchByNode(SLogicNode* pNode) {
return NULL;
}
static bool ctjFindSplitNode(SLogicSubplan* pSubplan, SCtjInfo* pInfo) {
SJoinLogicNode* pJoin = ctjMatchByNode(pSubplan->pNode);
static bool sigTbJoinSplFindSplitNode(SLogicSubplan* pSubplan, SSigTbJoinSplitInfo* pInfo) {
SJoinLogicNode* pJoin = sigTbJoinSplMatchByNode(pSubplan->pNode);
if (NULL != pJoin) {
pInfo->pJoin = pJoin;
pInfo->pSplitNode = nodesListGetNode(pJoin->node.pChildren, 1);
......@@ -192,9 +182,9 @@ static bool ctjFindSplitNode(SLogicSubplan* pSubplan, SCtjInfo* pInfo) {
return NULL != pJoin;
}
static int32_t ctjSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
SCtjInfo info = {0};
if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)ctjFindSplitNode, &info)) {
static int32_t singleTableJoinSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
SSigTbJoinSplitInfo info = {0};
if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)sigTbJoinSplFindSplitNode, &info)) {
return TSDB_CODE_SUCCESS;
}
int32_t code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, splCreateSubplan(pCxt, info.pSplitNode, 0));
......@@ -277,13 +267,18 @@ static int32_t unionSplitSubplan(SSplitContext* pCxt, SLogicSubplan* pUnionSubpl
return code;
}
static SLogicNode* uaMatchByNode(SLogicNode* pNode) {
typedef struct SUnionAllSplitInfo {
SProjectLogicNode* pProject;
SLogicSubplan* pSubplan;
} SUnionAllSplitInfo;
static SLogicNode* unionAllMatchByNode(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
return pNode;
}
SNode* pChild;
FOREACH(pChild, pNode->pChildren) {
SLogicNode* pSplitNode = uaMatchByNode((SLogicNode*)pChild);
SLogicNode* pSplitNode = unionAllMatchByNode((SLogicNode*)pChild);
if (NULL != pSplitNode) {
return pSplitNode;
}
......@@ -291,8 +286,8 @@ static SLogicNode* uaMatchByNode(SLogicNode* pNode) {
return NULL;
}
static bool uaFindSplitNode(SLogicSubplan* pSubplan, SUaInfo* pInfo) {
SLogicNode* pSplitNode = uaMatchByNode(pSubplan->pNode);
static bool unionAllFindSplitNode(SLogicSubplan* pSubplan, SUnionAllSplitInfo* pInfo) {
SLogicNode* pSplitNode = unionAllMatchByNode(pSubplan->pNode);
if (NULL != pSplitNode) {
pInfo->pProject = (SProjectLogicNode*)pSplitNode;
pInfo->pSubplan = pSubplan;
......@@ -300,13 +295,13 @@ static bool uaFindSplitNode(SLogicSubplan* pSubplan, SUaInfo* pInfo) {
return NULL != pSplitNode;
}
static int32_t uaCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SProjectLogicNode* pProject) {
static int32_t unionAllCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SProjectLogicNode* pProject) {
SExchangeLogicNode* pExchange = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE);
if (NULL == pExchange) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pExchange->srcGroupId = pCxt->groupId;
// pExchange->precision = pScan->pMeta->tableInfo.precision;
pExchange->precision = pProject->node.precision;
pExchange->node.pTargets = nodesCloneList(pProject->node.pTargets);
if (NULL == pExchange->node.pTargets) {
return TSDB_CODE_OUT_OF_MEMORY;
......@@ -332,28 +327,33 @@ static int32_t uaCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan
return TSDB_CODE_FAILED;
}
static int32_t uaSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
SUaInfo info = {0};
if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)uaFindSplitNode, &info)) {
static int32_t unionAllSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
SUnionAllSplitInfo info = {0};
if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)unionAllFindSplitNode, &info)) {
return TSDB_CODE_SUCCESS;
}
int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pProject);
if (TSDB_CODE_SUCCESS == code) {
code = uaCreateExchangeNode(pCxt, info.pSubplan, info.pProject);
code = unionAllCreateExchangeNode(pCxt, info.pSubplan, info.pProject);
}
++(pCxt->groupId);
pCxt->split = true;
return code;
}
static SLogicNode* unMatchByNode(SLogicNode* pNode) {
typedef struct SUnionDistinctSplitInfo {
SAggLogicNode* pAgg;
SLogicSubplan* pSubplan;
} SUnionDistinctSplitInfo;
static SLogicNode* unionDistinctMatchByNode(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
return pNode;
}
SNode* pChild;
FOREACH(pChild, pNode->pChildren) {
SLogicNode* pSplitNode = unMatchByNode((SLogicNode*)pChild);
SLogicNode* pSplitNode = unionDistinctMatchByNode((SLogicNode*)pChild);
if (NULL != pSplitNode) {
return pSplitNode;
}
......@@ -378,8 +378,8 @@ static int32_t unCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan
return nodesListMakeAppend(&pAgg->node.pChildren, pExchange);
}
static bool unFindSplitNode(SLogicSubplan* pSubplan, SUnInfo* pInfo) {
SLogicNode* pSplitNode = unMatchByNode(pSubplan->pNode);
static bool unionDistinctFindSplitNode(SLogicSubplan* pSubplan, SUnionDistinctSplitInfo* pInfo) {
SLogicNode* pSplitNode = unionDistinctMatchByNode(pSubplan->pNode);
if (NULL != pSplitNode) {
pInfo->pAgg = (SAggLogicNode*)pSplitNode;
pInfo->pSubplan = pSubplan;
......@@ -387,9 +387,9 @@ static bool unFindSplitNode(SLogicSubplan* pSubplan, SUnInfo* pInfo) {
return NULL != pSplitNode;
}
static int32_t unSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
SUnInfo info = {0};
if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)unFindSplitNode, &info)) {
static int32_t unionDistinctSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
SUnionDistinctSplitInfo info = {0};
if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)unionDistinctFindSplitNode, &info)) {
return TSDB_CODE_SUCCESS;
}
......@@ -402,10 +402,14 @@ static int32_t unSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
return code;
}
static const SSplitRule splitRuleSet[] = {{.pName = "SuperTableScan", .splitFunc = stsSplit},
{.pName = "ChildTableJoin", .splitFunc = ctjSplit},
{.pName = "UnionAll", .splitFunc = uaSplit},
{.pName = "Union", .splitFunc = unSplit}};
// clang-format off
static const SSplitRule splitRuleSet[] = {
{.pName = "SuperTableSplit", .splitFunc = stableSplit},
{.pName = "SingleTableJoinSplit", .splitFunc = singleTableJoinSplit},
{.pName = "UnionAllSplit", .splitFunc = unionAllSplit},
{.pName = "UnionDistinctSplit", .splitFunc = unionDistinctSplit}
};
// clang-format on
static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule));
......
......@@ -50,4 +50,10 @@ TEST_F(PlanIntervalTest, selectFunc) {
run("SELECT MAX(c1), MIN(c1) FROM t1 INTERVAL(10s)");
// select function along with the columns of select row, and with INTERVAL clause
run("SELECT MAX(c1), c2 FROM t1 INTERVAL(10s)");
}
\ No newline at end of file
}
TEST_F(PlanIntervalTest, stable) {
useDb("root", "test");
run("SELECT COUNT(*) FROM st1 INTERVAL(10s)");
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册