提交 a3343377 编写于 作者: X Xiaoyu Wang

feat: merge and merge scan add group sort field

上级 732a91c9
...@@ -76,8 +76,8 @@ typedef struct SScanLogicNode { ...@@ -76,8 +76,8 @@ typedef struct SScanLogicNode {
int16_t tsColId; int16_t tsColId;
double filesFactor; double filesFactor;
SArray* pSmaIndexes; SArray* pSmaIndexes;
SNodeList* pPartTags; SNodeList* pGroupTags;
bool partSort; bool groupSort;
} SScanLogicNode; } SScanLogicNode;
typedef struct SJoinLogicNode { typedef struct SJoinLogicNode {
...@@ -141,6 +141,7 @@ typedef struct SMergeLogicNode { ...@@ -141,6 +141,7 @@ typedef struct SMergeLogicNode {
SNodeList* pInputs; SNodeList* pInputs;
int32_t numOfChannels; int32_t numOfChannels;
int32_t srcGroupId; int32_t srcGroupId;
bool groupSort;
} SMergeLogicNode; } SMergeLogicNode;
typedef enum EWindowType { WINDOW_TYPE_INTERVAL = 1, WINDOW_TYPE_SESSION, WINDOW_TYPE_STATE } EWindowType; typedef enum EWindowType { WINDOW_TYPE_INTERVAL = 1, WINDOW_TYPE_SESSION, WINDOW_TYPE_STATE } EWindowType;
...@@ -284,7 +285,8 @@ typedef struct STableScanPhysiNode { ...@@ -284,7 +285,8 @@ typedef struct STableScanPhysiNode {
double ratio; double ratio;
int32_t dataRequired; int32_t dataRequired;
SNodeList* pDynamicScanFuncs; SNodeList* pDynamicScanFuncs;
SNodeList* pPartitionTags; SNodeList* pGroupTags;
bool groupSort;
int64_t interval; int64_t interval;
int64_t offset; int64_t offset;
int64_t sliding; int64_t sliding;
...@@ -358,6 +360,7 @@ typedef struct SMergePhysiNode { ...@@ -358,6 +360,7 @@ typedef struct SMergePhysiNode {
SNodeList* pTargets; SNodeList* pTargets;
int32_t numOfChannels; int32_t numOfChannels;
int32_t srcGroupId; int32_t srcGroupId;
bool groupSort;
} SMergePhysiNode; } SMergePhysiNode;
typedef struct SWinodwPhysiNode { typedef struct SWinodwPhysiNode {
......
...@@ -2051,7 +2051,7 @@ int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle ...@@ -2051,7 +2051,7 @@ int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle
qDebug("no table qualified for query, TID:0x%" PRIx64 ", QID:0x%" PRIx64, taskId, queryId); qDebug("no table qualified for query, TID:0x%" PRIx64 ", QID:0x%" PRIx64, taskId, queryId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
code = generateGroupIdMap(pTableListInfo, pHandle, pTableScanNode->pPartitionTags); code = generateGroupIdMap(pTableListInfo, pHandle, pTableScanNode->pGroupTags);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2455,7 +2455,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN ...@@ -2455,7 +2455,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
goto _error; goto _error;
} }
if (pTableScanNode->pPartitionTags) { if (pTableScanNode->pGroupTags) {
taosArraySort(pTableListInfo->pTableList, compareTableKeyInfoByGid); taosArraySort(pTableListInfo->pTableList, compareTableKeyInfoByGid);
} }
......
...@@ -351,7 +351,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) { ...@@ -351,7 +351,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
COPY_SCALAR_FIELD(watermark); COPY_SCALAR_FIELD(watermark);
COPY_SCALAR_FIELD(tsColId); COPY_SCALAR_FIELD(tsColId);
COPY_SCALAR_FIELD(filesFactor); COPY_SCALAR_FIELD(filesFactor);
CLONE_NODE_LIST_FIELD(pPartTags); CLONE_NODE_LIST_FIELD(pGroupTags);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -401,6 +401,7 @@ static int32_t logicMergeCopy(const SMergeLogicNode* pSrc, SMergeLogicNode* pDst ...@@ -401,6 +401,7 @@ static int32_t logicMergeCopy(const SMergeLogicNode* pSrc, SMergeLogicNode* pDst
CLONE_NODE_LIST_FIELD(pInputs); CLONE_NODE_LIST_FIELD(pInputs);
COPY_SCALAR_FIELD(numOfChannels); COPY_SCALAR_FIELD(numOfChannels);
COPY_SCALAR_FIELD(srcGroupId); COPY_SCALAR_FIELD(srcGroupId);
COPY_SCALAR_FIELD(groupSort);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -501,7 +502,7 @@ static int32_t physiTableScanCopy(const STableScanPhysiNode* pSrc, STableScanPhy ...@@ -501,7 +502,7 @@ static int32_t physiTableScanCopy(const STableScanPhysiNode* pSrc, STableScanPhy
COPY_SCALAR_FIELD(ratio); COPY_SCALAR_FIELD(ratio);
COPY_SCALAR_FIELD(dataRequired); COPY_SCALAR_FIELD(dataRequired);
CLONE_NODE_LIST_FIELD(pDynamicScanFuncs); CLONE_NODE_LIST_FIELD(pDynamicScanFuncs);
CLONE_NODE_LIST_FIELD(pPartitionTags); CLONE_NODE_LIST_FIELD(pGroupTags);
COPY_SCALAR_FIELD(interval); COPY_SCALAR_FIELD(interval);
COPY_SCALAR_FIELD(offset); COPY_SCALAR_FIELD(offset);
COPY_SCALAR_FIELD(sliding); COPY_SCALAR_FIELD(sliding);
......
...@@ -541,7 +541,7 @@ static const char* jkScanLogicPlanScanPseudoCols = "ScanPseudoCols"; ...@@ -541,7 +541,7 @@ static const char* jkScanLogicPlanScanPseudoCols = "ScanPseudoCols";
static const char* jkScanLogicPlanTableId = "TableId"; static const char* jkScanLogicPlanTableId = "TableId";
static const char* jkScanLogicPlanTableType = "TableType"; static const char* jkScanLogicPlanTableType = "TableType";
static const char* jkScanLogicPlanTagCond = "TagCond"; static const char* jkScanLogicPlanTagCond = "TagCond";
static const char* jkScanLogicPlanPartTags = "PartTags"; static const char* jkScanLogicPlanGroupTags = "GroupTags";
static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) { static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
const SScanLogicNode* pNode = (const SScanLogicNode*)pObj; const SScanLogicNode* pNode = (const SScanLogicNode*)pObj;
...@@ -563,7 +563,7 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) { ...@@ -563,7 +563,7 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
code = tjsonAddObject(pJson, jkScanLogicPlanTagCond, nodeToJson, pNode->pTagCond); code = tjsonAddObject(pJson, jkScanLogicPlanTagCond, nodeToJson, pNode->pTagCond);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkScanLogicPlanPartTags, pNode->pPartTags); code = nodeListToJson(pJson, jkScanLogicPlanGroupTags, pNode->pGroupTags);
} }
return code; return code;
...@@ -590,7 +590,7 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) { ...@@ -590,7 +590,7 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) {
code = jsonToNodeObject(pJson, jkScanLogicPlanTagCond, &pNode->pTagCond); code = jsonToNodeObject(pJson, jkScanLogicPlanTagCond, &pNode->pTagCond);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkScanLogicPlanPartTags, &pNode->pPartTags); code = jsonToNodeList(pJson, jkScanLogicPlanGroupTags, &pNode->pGroupTags);
} }
return code; return code;
...@@ -1432,7 +1432,8 @@ static const char* jkTableScanPhysiPlanTriggerType = "triggerType"; ...@@ -1432,7 +1432,8 @@ static const char* jkTableScanPhysiPlanTriggerType = "triggerType";
static const char* jkTableScanPhysiPlanWatermark = "watermark"; static const char* jkTableScanPhysiPlanWatermark = "watermark";
static const char* jkTableScanPhysiPlanTsColId = "tsColId"; static const char* jkTableScanPhysiPlanTsColId = "tsColId";
static const char* jkTableScanPhysiPlanFilesFactor = "FilesFactor"; static const char* jkTableScanPhysiPlanFilesFactor = "FilesFactor";
static const char* jkTableScanPhysiPlanPartitionTags = "PartitionTags"; static const char* jkTableScanPhysiPlanGroupTags = "GroupTags";
static const char* jkTableScanPhysiPlanGroupSort = "GroupSort";
static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) { static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj; const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj;
...@@ -1487,7 +1488,10 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) { ...@@ -1487,7 +1488,10 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
code = tjsonAddDoubleToObject(pJson, jkTableScanPhysiPlanFilesFactor, pNode->filesFactor); code = tjsonAddDoubleToObject(pJson, jkTableScanPhysiPlanFilesFactor, pNode->filesFactor);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkTableScanPhysiPlanPartitionTags, pNode->pPartitionTags); code = nodeListToJson(pJson, jkTableScanPhysiPlanGroupTags, pNode->pGroupTags);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanGroupSort, pNode->groupSort);
} }
return code; return code;
...@@ -1546,7 +1550,10 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) { ...@@ -1546,7 +1550,10 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
code = tjsonGetDoubleValue(pJson, jkTableScanPhysiPlanFilesFactor, &pNode->filesFactor); code = tjsonGetDoubleValue(pJson, jkTableScanPhysiPlanFilesFactor, &pNode->filesFactor);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkTableScanPhysiPlanPartitionTags, &pNode->pPartitionTags); code = jsonToNodeList(pJson, jkTableScanPhysiPlanGroupTags, &pNode->pGroupTags);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanGroupSort, &pNode->groupSort);
} }
return code; return code;
...@@ -1727,6 +1734,7 @@ static const char* jkMergePhysiPlanMergeKeys = "MergeKeys"; ...@@ -1727,6 +1734,7 @@ static const char* jkMergePhysiPlanMergeKeys = "MergeKeys";
static const char* jkMergePhysiPlanTargets = "Targets"; static const char* jkMergePhysiPlanTargets = "Targets";
static const char* jkMergePhysiPlanNumOfChannels = "NumOfChannels"; static const char* jkMergePhysiPlanNumOfChannels = "NumOfChannels";
static const char* jkMergePhysiPlanSrcGroupId = "SrcGroupId"; static const char* jkMergePhysiPlanSrcGroupId = "SrcGroupId";
static const char* jkMergePhysiPlanGroupSort = "GroupSort";
static int32_t physiMergeNodeToJson(const void* pObj, SJson* pJson) { static int32_t physiMergeNodeToJson(const void* pObj, SJson* pJson) {
const SMergePhysiNode* pNode = (const SMergePhysiNode*)pObj; const SMergePhysiNode* pNode = (const SMergePhysiNode*)pObj;
...@@ -1744,6 +1752,9 @@ static int32_t physiMergeNodeToJson(const void* pObj, SJson* pJson) { ...@@ -1744,6 +1752,9 @@ static int32_t physiMergeNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkMergePhysiPlanSrcGroupId, pNode->srcGroupId); code = tjsonAddIntegerToObject(pJson, jkMergePhysiPlanSrcGroupId, pNode->srcGroupId);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkMergePhysiPlanGroupSort, pNode->groupSort);
}
return code; return code;
} }
...@@ -1764,6 +1775,9 @@ static int32_t jsonToPhysiMergeNode(const SJson* pJson, void* pObj) { ...@@ -1764,6 +1775,9 @@ static int32_t jsonToPhysiMergeNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkMergePhysiPlanSrcGroupId, &pNode->srcGroupId); code = tjsonGetIntValue(pJson, jkMergePhysiPlanSrcGroupId, &pNode->srcGroupId);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkMergePhysiPlanGroupSort, &pNode->groupSort);
}
return code; return code;
} }
......
...@@ -711,7 +711,7 @@ void nodesDestroyNode(SNode* pNode) { ...@@ -711,7 +711,7 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode(pLogicNode->pTagCond); nodesDestroyNode(pLogicNode->pTagCond);
nodesDestroyNode(pLogicNode->pTagIndexCond); nodesDestroyNode(pLogicNode->pTagIndexCond);
taosArrayDestroy(pLogicNode->pSmaIndexes); taosArrayDestroy(pLogicNode->pSmaIndexes);
nodesDestroyList(pLogicNode->pPartTags); nodesDestroyList(pLogicNode->pGroupTags);
break; break;
} }
case QUERY_NODE_LOGIC_PLAN_JOIN: { case QUERY_NODE_LOGIC_PLAN_JOIN: {
...@@ -815,7 +815,7 @@ void nodesDestroyNode(SNode* pNode) { ...@@ -815,7 +815,7 @@ void nodesDestroyNode(SNode* pNode) {
STableScanPhysiNode* pPhyNode = (STableScanPhysiNode*)pNode; STableScanPhysiNode* pPhyNode = (STableScanPhysiNode*)pNode;
destroyScanPhysiNode((SScanPhysiNode*)pNode); destroyScanPhysiNode((SScanPhysiNode*)pNode);
nodesDestroyList(pPhyNode->pDynamicScanFuncs); nodesDestroyList(pPhyNode->pDynamicScanFuncs);
nodesDestroyList(pPhyNode->pPartitionTags); nodesDestroyList(pPhyNode->pGroupTags);
break; break;
} }
case QUERY_NODE_PHYSICAL_PLAN_PROJECT: { case QUERY_NODE_PHYSICAL_PLAN_PROJECT: {
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
#define OPTIMIZE_FLAG_MASK(n) (1 << n) #define OPTIMIZE_FLAG_MASK(n) (1 << n)
#define OPTIMIZE_FLAG_OSD OPTIMIZE_FLAG_MASK(0) #define OPTIMIZE_FLAG_SCAN_PATH OPTIMIZE_FLAG_MASK(0)
#define OPTIMIZE_FLAG_PUSH_DOWN_CONDE OPTIMIZE_FLAG_MASK(1) #define OPTIMIZE_FLAG_PUSH_DOWN_CONDE OPTIMIZE_FLAG_MASK(1)
#define OPTIMIZE_FLAG_SET_MASK(val, mask) (val) |= (mask) #define OPTIMIZE_FLAG_SET_MASK(val, mask) (val) |= (mask)
...@@ -91,7 +91,7 @@ static bool scanPathOptHaveNormalCol(SNodeList* pList) { ...@@ -91,7 +91,7 @@ static bool scanPathOptHaveNormalCol(SNodeList* pList) {
} }
static bool scanPathOptMayBeOptimized(SLogicNode* pNode) { static bool scanPathOptMayBeOptimized(SLogicNode* pNode) {
if (OPTIMIZE_FLAG_TEST_MASK(pNode->optimizedFlag, OPTIMIZE_FLAG_OSD)) { if (OPTIMIZE_FLAG_TEST_MASK(pNode->optimizedFlag, OPTIMIZE_FLAG_SCAN_PATH)) {
return false; return false;
} }
if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pNode)) { if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pNode)) {
...@@ -241,7 +241,7 @@ static int32_t scanPathOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub ...@@ -241,7 +241,7 @@ static int32_t scanPathOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
if (TSDB_CODE_SUCCESS == code && (NULL != info.pDsoFuncs || NULL != info.pSdrFuncs)) { if (TSDB_CODE_SUCCESS == code && (NULL != info.pDsoFuncs || NULL != info.pSdrFuncs)) {
info.pScan->dataRequired = scanPathOptGetDataRequired(info.pSdrFuncs); info.pScan->dataRequired = scanPathOptGetDataRequired(info.pSdrFuncs);
info.pScan->pDynamicScanFuncs = info.pDsoFuncs; info.pScan->pDynamicScanFuncs = info.pDsoFuncs;
OPTIMIZE_FLAG_SET_MASK(info.pScan->node.optimizedFlag, OPTIMIZE_FLAG_OSD); OPTIMIZE_FLAG_SET_MASK(info.pScan->node.optimizedFlag, OPTIMIZE_FLAG_SCAN_PATH);
pCxt->optimized = true; pCxt->optimized = true;
} }
nodesDestroyList(info.pSdrFuncs); nodesDestroyList(info.pSdrFuncs);
...@@ -1073,7 +1073,7 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub ...@@ -1073,7 +1073,7 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pNode->pChildren, 0); SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pNode->pChildren, 0);
if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) { if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
TSWAP(((SPartitionLogicNode*)pNode)->pPartitionKeys, pScan->pPartTags); TSWAP(((SPartitionLogicNode*)pNode)->pPartitionKeys, pScan->pGroupTags);
int32_t code = replaceLogicNode(pLogicSubplan, pNode, (SLogicNode*)pScan); int32_t code = replaceLogicNode(pLogicSubplan, pNode, (SLogicNode*)pScan);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
NODES_CLEAR_LIST(pNode->pChildren); NODES_CLEAR_LIST(pNode->pChildren);
...@@ -1083,7 +1083,7 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub ...@@ -1083,7 +1083,7 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
SNode* pGroupKey = NULL; SNode* pGroupKey = NULL;
FOREACH(pGroupKey, ((SAggLogicNode*)pNode)->pGroupKeys) { FOREACH(pGroupKey, ((SAggLogicNode*)pNode)->pGroupKeys) {
code = nodesListMakeStrictAppend( code = nodesListMakeStrictAppend(
&pScan->pPartTags, nodesCloneNode(nodesListGetNode(((SGroupingSetNode*)pGroupKey)->pParameterList, 0))); &pScan->pGroupTags, nodesCloneNode(nodesListGetNode(((SGroupingSetNode*)pGroupKey)->pParameterList, 0)));
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
break; break;
} }
...@@ -1091,7 +1091,7 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub ...@@ -1091,7 +1091,7 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
NODES_DESTORY_LIST(((SAggLogicNode*)pNode)->pGroupKeys); NODES_DESTORY_LIST(((SAggLogicNode*)pNode)->pGroupKeys);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = partTagsOptRebuildTbanme(pScan->pPartTags); code = partTagsOptRebuildTbanme(pScan->pGroupTags);
} }
return code; return code;
} }
...@@ -1212,7 +1212,7 @@ static bool rewriteTailOptNeedGroupSort(SIndefRowsFuncLogicNode* pIndef) { ...@@ -1212,7 +1212,7 @@ static bool rewriteTailOptNeedGroupSort(SIndefRowsFuncLogicNode* pIndef) {
} }
SNode* pChild = nodesListGetNode(pIndef->node.pChildren, 0); SNode* pChild = nodesListGetNode(pIndef->node.pChildren, 0);
return QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pChild) || return QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pChild) ||
(QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && NULL != ((SScanLogicNode*)pChild)->pPartTags); (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && NULL != ((SScanLogicNode*)pChild)->pGroupTags);
} }
static int32_t rewriteTailOptCreateSort(SIndefRowsFuncLogicNode* pIndef, SLogicNode** pOutput) { static int32_t rewriteTailOptCreateSort(SIndefRowsFuncLogicNode* pIndef, SLogicNode** pOutput) {
......
...@@ -520,12 +520,13 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp ...@@ -520,12 +520,13 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName); tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
pTableScan->dataRequired = pScanLogicNode->dataRequired; pTableScan->dataRequired = pScanLogicNode->dataRequired;
pTableScan->pDynamicScanFuncs = nodesCloneList(pScanLogicNode->pDynamicScanFuncs); pTableScan->pDynamicScanFuncs = nodesCloneList(pScanLogicNode->pDynamicScanFuncs);
pTableScan->pPartitionTags = nodesCloneList(pScanLogicNode->pPartTags); pTableScan->pGroupTags = nodesCloneList(pScanLogicNode->pGroupTags);
if ((NULL != pScanLogicNode->pDynamicScanFuncs && NULL == pTableScan->pDynamicScanFuncs) || if ((NULL != pScanLogicNode->pDynamicScanFuncs && NULL == pTableScan->pDynamicScanFuncs) ||
(NULL != pScanLogicNode->pPartTags && NULL == pTableScan->pPartitionTags)) { (NULL != pScanLogicNode->pGroupTags && NULL == pTableScan->pGroupTags)) {
nodesDestroyNode((SNode*)pTableScan); nodesDestroyNode((SNode*)pTableScan);
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pTableScan->groupSort = pScanLogicNode->groupSort;
pTableScan->interval = pScanLogicNode->interval; pTableScan->interval = pScanLogicNode->interval;
pTableScan->offset = pScanLogicNode->offset; pTableScan->offset = pScanLogicNode->offset;
pTableScan->sliding = pScanLogicNode->sliding; pTableScan->sliding = pScanLogicNode->sliding;
...@@ -1330,6 +1331,7 @@ static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SMergeLogicNode* pM ...@@ -1330,6 +1331,7 @@ static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SMergeLogicNode* pM
pMerge->numOfChannels = pMergeLogicNode->numOfChannels; pMerge->numOfChannels = pMergeLogicNode->numOfChannels;
pMerge->srcGroupId = pMergeLogicNode->srcGroupId; pMerge->srcGroupId = pMergeLogicNode->srcGroupId;
pMerge->groupSort = pMergeLogicNode->groupSort;
int32_t code = addDataBlockSlots(pCxt, pMergeLogicNode->pInputs, pMerge->node.pOutputDataBlockDesc); int32_t code = addDataBlockSlots(pCxt, pMergeLogicNode->pInputs, pMerge->node.pOutputDataBlockDesc);
......
...@@ -362,7 +362,7 @@ static int32_t stbSplGetNumOfVgroups(SLogicNode* pNode) { ...@@ -362,7 +362,7 @@ static int32_t stbSplGetNumOfVgroups(SLogicNode* pNode) {
} }
static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode, static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode,
SNodeList* pMergeKeys, SLogicNode* pPartChild) { SNodeList* pMergeKeys, SLogicNode* pPartChild, bool groupSort) {
SMergeLogicNode* pMerge = (SMergeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE); SMergeLogicNode* pMerge = (SMergeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE);
if (NULL == pMerge) { if (NULL == pMerge) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
...@@ -371,6 +371,7 @@ static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubpla ...@@ -371,6 +371,7 @@ static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubpla
pMerge->srcGroupId = pCxt->groupId; pMerge->srcGroupId = pCxt->groupId;
pMerge->node.precision = pPartChild->precision; pMerge->node.precision = pPartChild->precision;
pMerge->pMergeKeys = pMergeKeys; pMerge->pMergeKeys = pMergeKeys;
pMerge->groupSort = groupSort;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
pMerge->pInputs = nodesCloneList(pPartChild->pTargets); pMerge->pInputs = nodesCloneList(pPartChild->pTargets);
...@@ -430,7 +431,7 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo ...@@ -430,7 +431,7 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo
SNodeList* pMergeKeys = NULL; SNodeList* pMergeKeys = NULL;
code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk, &pMergeKeys); code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk, &pMergeKeys);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow); code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow, false);
} }
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
nodesDestroyList(pMergeKeys); nodesDestroyList(pMergeKeys);
...@@ -497,12 +498,16 @@ static int32_t stbSplSplitSessionForStream(SSplitContext* pCxt, SStableSplitInfo ...@@ -497,12 +498,16 @@ static int32_t stbSplSplitSessionForStream(SSplitContext* pCxt, SStableSplitInfo
return code; return code;
} }
static void splSetTableScanType(SLogicNode* pNode, EScanType scanType) { static void stbSplSetTableMergeScan(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) { if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
((SScanLogicNode*)pNode)->scanType = scanType; SScanLogicNode* pScan = (SScanLogicNode*)pNode;
pScan->scanType = SCAN_TYPE_TABLE_MERGE;
if (NULL != pScan->pGroupTags) {
pScan->groupSort = true;
}
} else { } else {
if (1 == LIST_LENGTH(pNode->pChildren)) { if (1 == LIST_LENGTH(pNode->pChildren)) {
splSetTableScanType((SLogicNode*)nodesListGetNode(pNode->pChildren, 0), scanType); stbSplSetTableMergeScan((SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
} }
} }
} }
...@@ -515,7 +520,7 @@ static int32_t stbSplSplitSessionOrStateForBatch(SSplitContext* pCxt, SStableSpl ...@@ -515,7 +520,7 @@ static int32_t stbSplSplitSessionOrStateForBatch(SSplitContext* pCxt, SStableSpl
int32_t code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pWindow)->pTspk, &pMergeKeys); int32_t code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pWindow)->pTspk, &pMergeKeys);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pChild, pMergeKeys, (SLogicNode*)pChild); code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pChild, pMergeKeys, (SLogicNode*)pChild, true);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
...@@ -524,13 +529,10 @@ static int32_t stbSplSplitSessionOrStateForBatch(SSplitContext* pCxt, SStableSpl ...@@ -524,13 +529,10 @@ static int32_t stbSplSplitSessionOrStateForBatch(SSplitContext* pCxt, SStableSpl
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
splSetTableScanType(pChild, SCAN_TYPE_TABLE_MERGE); stbSplSetTableMergeScan(pChild);
++(pCxt->groupId);
}
if (TSDB_CODE_SUCCESS == code) {
pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE; pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT); SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT);
++(pCxt->groupId);
} else { } else {
nodesDestroyList(pMergeKeys); nodesDestroyList(pMergeKeys);
} }
...@@ -560,7 +562,7 @@ static int32_t stbSplSplitState(SSplitContext* pCxt, SStableSplitInfo* pInfo) { ...@@ -560,7 +562,7 @@ static int32_t stbSplSplitState(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
static SNodeList* stbSplGetPartKeys(SLogicNode* pNode) { static SNodeList* stbSplGetPartKeys(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) { if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
return ((SScanLogicNode*)pNode)->pPartTags; return ((SScanLogicNode*)pNode)->pGroupTags;
} else if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) { } else if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
return ((SPartitionLogicNode*)pNode)->pPartitionKeys; return ((SPartitionLogicNode*)pNode)->pPartitionKeys;
} else { } else {
...@@ -790,12 +792,29 @@ static int32_t stbSplCreatePartSortNode(SSortLogicNode* pSort, SLogicNode** pOut ...@@ -790,12 +792,29 @@ static int32_t stbSplCreatePartSortNode(SSortLogicNode* pSort, SLogicNode** pOut
return code; return code;
} }
static void stbSplSetScanPartSort(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
SScanLogicNode* pScan = (SScanLogicNode*)pNode;
if (NULL != pScan->pGroupTags) {
pScan->groupSort = true;
}
} else {
if (1 == LIST_LENGTH(pNode->pChildren)) {
stbSplSetScanPartSort((SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
}
}
}
static int32_t stbSplSplitSortNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { static int32_t stbSplSplitSortNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
SLogicNode* pPartSort = NULL; SLogicNode* pPartSort = NULL;
SNodeList* pMergeKeys = NULL; SNodeList* pMergeKeys = NULL;
bool groupSort = ((SSortLogicNode*)pInfo->pSplitNode)->groupSort;
int32_t code = stbSplCreatePartSortNode((SSortLogicNode*)pInfo->pSplitNode, &pPartSort, &pMergeKeys); int32_t code = stbSplCreatePartSortNode((SSortLogicNode*)pInfo->pSplitNode, &pPartSort, &pMergeKeys);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, pMergeKeys, pPartSort); code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, pMergeKeys, pPartSort, groupSort);
}
if (TSDB_CODE_SUCCESS == code && groupSort) {
stbSplSetScanPartSort(pPartSort);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
...@@ -830,7 +849,7 @@ static int32_t stbSplSplitScanNodeForJoin(SSplitContext* pCxt, SLogicSubplan* pS ...@@ -830,7 +849,7 @@ static int32_t stbSplSplitScanNodeForJoin(SSplitContext* pCxt, SLogicSubplan* pS
SNodeList* pMergeKeys = NULL; SNodeList* pMergeKeys = NULL;
int32_t code = stbSplCreateMergeKeysByPrimaryKey(stbSplFindPrimaryKeyFromScan(pScan), &pMergeKeys); int32_t code = stbSplCreateMergeKeysByPrimaryKey(stbSplFindPrimaryKeyFromScan(pScan), &pMergeKeys);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateMergeNode(pCxt, pSubplan, (SLogicNode*)pScan, pMergeKeys, (SLogicNode*)pScan); code = stbSplCreateMergeNode(pCxt, pSubplan, (SLogicNode*)pScan, pMergeKeys, (SLogicNode*)pScan, false);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pSubplan->pChildren, code = nodesListMakeStrictAppend(&pSubplan->pChildren,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册