planSpliter.c 32.5 KB
Newer Older
X
Xiaoyu Wang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

X
Xiaoyu Wang 已提交
16
#include "functionMgt.h"
X
Xiaoyu Wang 已提交
17
#include "planInt.h"
X
Xiaoyu Wang 已提交
18

X
Xiaoyu Wang 已提交
19
#define SPLIT_FLAG_MASK(n) (1 << n)
X
Xiaoyu Wang 已提交
20

X
Xiaoyu Wang 已提交
21
#define SPLIT_FLAG_STABLE_SPLIT SPLIT_FLAG_MASK(0)
X
Xiaoyu Wang 已提交
22

X
Xiaoyu Wang 已提交
23
#define SPLIT_FLAG_SET_MASK(val, mask)  (val) |= (mask)
X
Xiaoyu Wang 已提交
24 25 26
#define SPLIT_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0)

typedef struct SSplitContext {
27 28 29 30
  SPlanContext* pPlanCxt;
  uint64_t      queryId;
  int32_t       groupId;
  bool          split;
X
Xiaoyu Wang 已提交
31 32
} SSplitContext;

33
typedef int32_t (*FSplit)(SSplitContext* pCxt, SLogicSubplan* pSubplan);
X
Xiaoyu Wang 已提交
34 35

typedef struct SSplitRule {
X
Xiaoyu Wang 已提交
36
  char*  pName;
X
Xiaoyu Wang 已提交
37 38 39
  FSplit splitFunc;
} SSplitRule;

40
typedef bool (*FSplFindSplitNode)(SSplitContext* pCxt, SLogicSubplan* pSubplan, void* pInfo);
X
Xiaoyu Wang 已提交
41

X
Xiaoyu Wang 已提交
42 43 44 45 46 47 48 49 50 51 52
static void splSetSubplanVgroups(SLogicSubplan* pSubplan, SLogicNode* pNode) {
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
    TSWAP(pSubplan->pVgroupList, ((SScanLogicNode*)pNode)->pVgroupList);
  } else {
    if (1 == LIST_LENGTH(pNode->pChildren)) {
      splSetSubplanVgroups(pSubplan, (SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
    }
  }
}

static SLogicSubplan* splCreateScanSubplan(SSplitContext* pCxt, SLogicNode* pNode, int32_t flag) {
53
  SLogicSubplan* pSubplan = (SLogicSubplan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
X
Xiaoyu Wang 已提交
54 55 56
  if (NULL == pSubplan) {
    return NULL;
  }
57
  pSubplan->id.queryId = pCxt->queryId;
X
Xiaoyu Wang 已提交
58 59
  pSubplan->id.groupId = pCxt->groupId;
  pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
X
Xiaoyu Wang 已提交
60 61 62
  pSubplan->pNode = pNode;
  pSubplan->pNode->pParent = NULL;
  splSetSubplanVgroups(pSubplan, pNode);
63
  SPLIT_FLAG_SET_MASK(pSubplan->splitFlag, flag);
X
Xiaoyu Wang 已提交
64 65 66
  return pSubplan;
}

67
static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pChild, SExchangeLogicNode** pOutput) {
68
  SExchangeLogicNode* pExchange = (SExchangeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE);
X
Xiaoyu Wang 已提交
69 70 71 72
  if (NULL == pExchange) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  pExchange->srcGroupId = pCxt->groupId;
73 74
  pExchange->node.precision = pChild->precision;
  pExchange->node.pTargets = nodesCloneList(pChild->pTargets);
X
Xiaoyu Wang 已提交
75 76 77 78
  if (NULL == pExchange->node.pTargets) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

79 80 81 82
  *pOutput = pExchange;
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
83 84 85 86 87
static int32_t splCreateExchangeNodeForSubplan(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode,
                                               ESubplanType subplanType) {
  SExchangeLogicNode* pExchange = NULL;
  int32_t             code = splCreateExchangeNode(pCxt, pSplitNode, &pExchange);
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
88
    code = replaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pExchange);
X
Xiaoyu Wang 已提交
89 90 91 92
  }
  if (TSDB_CODE_SUCCESS == code) {
    pSubplan->subplanType = subplanType;
  } else {
93
    nodesDestroyNode((SNode*)pExchange);
X
Xiaoyu Wang 已提交
94 95
  }
  return code;
X
Xiaoyu Wang 已提交
96 97
}

98 99
static bool splMatch(SSplitContext* pCxt, SLogicSubplan* pSubplan, int32_t flag, FSplFindSplitNode func, void* pInfo) {
  if (!SPLIT_FLAG_TEST_MASK(pSubplan->splitFlag, flag)) {
100
    if (func(pCxt, pSubplan, pInfo)) {
101 102 103 104 105 106 107 108 109 110 111 112
      return true;
    }
  }
  SNode* pChild;
  FOREACH(pChild, pSubplan->pChildren) {
    if (splMatch(pCxt, (SLogicSubplan*)pChild, flag, func, pInfo)) {
      return true;
    }
  }
  return false;
}

X
Xiaoyu Wang 已提交
113
typedef struct SStableSplitInfo {
X
Xiaoyu Wang 已提交
114 115
  SLogicNode*    pSplitNode;
  SLogicSubplan* pSubplan;
X
Xiaoyu Wang 已提交
116 117
} SStableSplitInfo;

X
Xiaoyu Wang 已提交
118 119 120
static bool stbSplHasGatherExecFunc(const SNodeList* pFuncs) {
  SNode* pFunc = NULL;
  FOREACH(pFunc, pFuncs) {
121 122
    if (!fmIsWindowPseudoColumnFunc(((SFunctionNode*)pFunc)->funcId) &&
        !fmIsDistExecFunc(((SFunctionNode*)pFunc)->funcId)) {
X
Xiaoyu Wang 已提交
123 124 125 126 127 128
      return true;
    }
  }
  return false;
}

129 130
static bool stbSplIsMultiTbScan(bool streamQuery, SScanLogicNode* pScan) {
  return (NULL != pScan->pVgroupList && pScan->pVgroupList->numOfVgroups > 1) ||
X
Xiaoyu Wang 已提交
131
         (streamQuery && TSDB_SUPER_TABLE == pScan->tableType);
X
Xiaoyu Wang 已提交
132 133
}

134
static bool stbSplHasMultiTbScan(bool streamQuery, SLogicNode* pNode) {
X
Xiaoyu Wang 已提交
135 136 137 138
  if (1 != LIST_LENGTH(pNode->pChildren)) {
    return false;
  }
  SNode* pChild = nodesListGetNode(pNode->pChildren, 0);
139
  return (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pChild));
X
Xiaoyu Wang 已提交
140 141
}

142
static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
X
Xiaoyu Wang 已提交
143
  switch (nodeType(pNode)) {
X
Xiaoyu Wang 已提交
144 145
    case QUERY_NODE_LOGIC_PLAN_AGG:
      return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
146 147 148 149 150 151 152
    case QUERY_NODE_LOGIC_PLAN_WINDOW: {
      SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode;
      if (WINDOW_TYPE_INTERVAL != pWindow->winType) {
        return false;
      }
      return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
    }
153 154
    case QUERY_NODE_LOGIC_PLAN_SORT:
      return stbSplHasMultiTbScan(streamQuery, pNode);
X
Xiaoyu Wang 已提交
155
    case QUERY_NODE_LOGIC_PLAN_SCAN:
156
      return stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode);
X
Xiaoyu Wang 已提交
157 158 159 160 161 162
    default:
      break;
  }
  return false;
}

163 164
static SLogicNode* stbSplMatchByNode(bool streamQuery, SLogicNode* pNode) {
  if (stbSplNeedSplit(streamQuery, pNode)) {
165 166 167 168
    return pNode;
  }
  SNode* pChild;
  FOREACH(pChild, pNode->pChildren) {
169
    SLogicNode* pSplitNode = stbSplMatchByNode(streamQuery, (SLogicNode*)pChild);
170 171 172 173 174 175 176
    if (NULL != pSplitNode) {
      return pSplitNode;
    }
  }
  return NULL;
}

177 178
static bool stbSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SStableSplitInfo* pInfo) {
  SLogicNode* pSplitNode = stbSplMatchByNode(pCxt->pPlanCxt->streamQuery, pSubplan->pNode);
179
  if (NULL != pSplitNode) {
X
Xiaoyu Wang 已提交
180
    pInfo->pSplitNode = pSplitNode;
181 182 183 184 185
    pInfo->pSubplan = pSubplan;
  }
  return NULL != pSplitNode;
}

X
Xiaoyu Wang 已提交
186 187 188 189 190 191 192 193
static int32_t stbSplRewriteFuns(const SNodeList* pFuncs, SNodeList** pPartialFuncs, SNodeList** pMergeFuncs) {
  SNode* pNode = NULL;
  FOREACH(pNode, pFuncs) {
    SFunctionNode* pFunc = (SFunctionNode*)pNode;
    SFunctionNode* pPartFunc = NULL;
    SFunctionNode* pMergeFunc = NULL;
    int32_t        code = TSDB_CODE_SUCCESS;
    if (fmIsWindowPseudoColumnFunc(pFunc->funcId)) {
194 195
      pPartFunc = (SFunctionNode*)nodesCloneNode(pNode);
      pMergeFunc = (SFunctionNode*)nodesCloneNode(pNode);
X
Xiaoyu Wang 已提交
196
      if (NULL == pPartFunc || NULL == pMergeFunc) {
197 198
        nodesDestroyNode((SNode*)pPartFunc);
        nodesDestroyNode((SNode*)pMergeFunc);
X
Xiaoyu Wang 已提交
199 200 201 202 203 204
        code = TSDB_CODE_OUT_OF_MEMORY;
      }
    } else {
      code = fmGetDistMethod(pFunc, &pPartFunc, &pMergeFunc);
    }
    if (TSDB_CODE_SUCCESS == code) {
205
      code = nodesListMakeStrictAppend(pPartialFuncs, (SNode*)pPartFunc);
X
Xiaoyu Wang 已提交
206 207
    }
    if (TSDB_CODE_SUCCESS == code) {
208
      code = nodesListMakeStrictAppend(pMergeFuncs, (SNode*)pMergeFunc);
X
Xiaoyu Wang 已提交
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229
    }
    if (TSDB_CODE_SUCCESS != code) {
      nodesDestroyList(*pPartialFuncs);
      nodesDestroyList(*pMergeFuncs);
      return code;
    }
  }
  return TSDB_CODE_SUCCESS;
}

static int32_t stbSplAppendWStart(SNodeList* pFuncs, int32_t* pIndex) {
  int32_t index = 0;
  SNode*  pFunc = NULL;
  FOREACH(pFunc, pFuncs) {
    if (FUNCTION_TYPE_WSTARTTS == ((SFunctionNode*)pFunc)->funcType) {
      *pIndex = index;
      return TSDB_CODE_SUCCESS;
    }
    ++index;
  }

230
  SFunctionNode* pWStart = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
X
Xiaoyu Wang 已提交
231 232 233 234 235 236 237
  if (NULL == pWStart) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  strcpy(pWStart->functionName, "_wstartts");
  snprintf(pWStart->node.aliasName, sizeof(pWStart->node.aliasName), "%s.%p", pWStart->functionName, pWStart);
  int32_t code = fmGetFuncInfo(pWStart, NULL, 0);
  if (TSDB_CODE_SUCCESS == code) {
238
    code = nodesListStrictAppend(pFuncs, (SNode*)pWStart);
X
Xiaoyu Wang 已提交
239 240 241 242 243 244 245 246 247 248 249 250 251 252
  }
  *pIndex = index;
  return code;
}

static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogicNode** pPartWindow) {
  SNodeList* pFunc = pMergeWindow->pFuncs;
  pMergeWindow->pFuncs = NULL;
  SNodeList* pTargets = pMergeWindow->node.pTargets;
  pMergeWindow->node.pTargets = NULL;
  SNodeList* pChildren = pMergeWindow->node.pChildren;
  pMergeWindow->node.pChildren = NULL;

  int32_t           code = TSDB_CODE_SUCCESS;
253
  SWindowLogicNode* pPartWin = (SWindowLogicNode*)nodesCloneNode((SNode*)pMergeWindow);
X
Xiaoyu Wang 已提交
254 255 256 257 258 259 260 261 262 263 264 265 266 267
  if (NULL == pPartWin) {
    code = TSDB_CODE_OUT_OF_MEMORY;
  }

  if (TSDB_CODE_SUCCESS == code) {
    pMergeWindow->node.pTargets = pTargets;
    pPartWin->node.pChildren = pChildren;
    code = stbSplRewriteFuns(pFunc, &pPartWin->pFuncs, &pMergeWindow->pFuncs);
  }
  int32_t index = 0;
  if (TSDB_CODE_SUCCESS == code) {
    code = stbSplAppendWStart(pPartWin->pFuncs, &index);
  }
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
268
    code = createColumnByRewriteExprs(pPartWin->pFuncs, &pPartWin->node.pTargets);
X
Xiaoyu Wang 已提交
269 270 271 272 273 274 275 276 277 278 279 280 281
  }
  if (TSDB_CODE_SUCCESS == code) {
    nodesDestroyNode(pMergeWindow->pTspk);
    pMergeWindow->pTspk = nodesCloneNode(nodesListGetNode(pPartWin->node.pTargets, index));
    if (NULL == pMergeWindow->pTspk) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  }

  nodesDestroyList(pFunc);
  if (TSDB_CODE_SUCCESS == code) {
    *pPartWindow = (SLogicNode*)pPartWin;
  } else {
282
    nodesDestroyNode((SNode*)pPartWin);
X
Xiaoyu Wang 已提交
283 284 285 286 287
  }

  return code;
}

X
Xiaoyu Wang 已提交
288 289
static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode,
                                     SNodeList* pMergeKeys, SLogicNode* pPartChild) {
290
  SMergeLogicNode* pMerge = (SMergeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE);
X
Xiaoyu Wang 已提交
291 292 293 294 295 296
  if (NULL == pMerge) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  pMerge->numOfChannels = ((SScanLogicNode*)nodesListGetNode(pPartChild->pChildren, 0))->pVgroupList->numOfVgroups;
  pMerge->srcGroupId = pCxt->groupId;
  pMerge->node.precision = pPartChild->precision;
X
Xiaoyu Wang 已提交
297
  pMerge->pMergeKeys = pMergeKeys;
X
Xiaoyu Wang 已提交
298 299 300

  int32_t code = TSDB_CODE_SUCCESS;
  pMerge->pInputs = nodesCloneList(pPartChild->pTargets);
301 302 303 304 305 306
  // NULL == pSubplan means 'merge node' replaces 'split node'.
  if (NULL == pSubplan) {
    pMerge->node.pTargets = nodesCloneList(pPartChild->pTargets);
  } else {
    pMerge->node.pTargets = nodesCloneList(pSplitNode->pTargets);
  }
X
Xiaoyu Wang 已提交
307 308 309 310 311
  if (NULL == pMerge->node.pTargets || NULL == pMerge->pInputs) {
    code = TSDB_CODE_OUT_OF_MEMORY;
  }
  if (TSDB_CODE_SUCCESS == code) {
    if (NULL == pSubplan) {
312
      code = nodesListMakeAppend(&pSplitNode->pChildren, (SNode*)pMerge);
X
Xiaoyu Wang 已提交
313
    } else {
X
Xiaoyu Wang 已提交
314
      code = replaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pMerge);
X
Xiaoyu Wang 已提交
315 316 317
    }
  }
  if (TSDB_CODE_SUCCESS != code) {
318
    nodesDestroyNode((SNode*)pMerge);
X
Xiaoyu Wang 已提交
319
  }
X
Xiaoyu Wang 已提交
320
  return code;
X
Xiaoyu Wang 已提交
321 322
}

X
Xiaoyu Wang 已提交
323 324 325 326
static int32_t stbSplCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pParent, SLogicNode* pPartChild) {
  SExchangeLogicNode* pExchange = NULL;
  int32_t             code = splCreateExchangeNode(pCxt, pPartChild, &pExchange);
  if (TSDB_CODE_SUCCESS == code) {
327
    code = nodesListMakeAppend(&pParent->pChildren, (SNode*)pExchange);
X
Xiaoyu Wang 已提交
328 329 330 331
  }
  return code;
}

332
static int32_t stbSplCreateMergeKeysForInterval(SNode* pWStartTs, SNodeList** pMergeKeys) {
333
  SOrderByExprNode* pMergeKey = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
334 335 336 337 338
  if (NULL == pMergeKey) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  pMergeKey->pExpr = nodesCloneNode(pWStartTs);
  if (NULL == pMergeKey->pExpr) {
339
    nodesDestroyNode((SNode*)pMergeKey);
340 341 342 343
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  pMergeKey->order = ORDER_ASC;
  pMergeKey->nullOrder = NULL_ORDER_FIRST;
344
  return nodesListMakeStrictAppend(pMergeKeys, (SNode*)pMergeKey);
345 346
}

347
static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
X
Xiaoyu Wang 已提交
348 349 350
  SLogicNode* pPartWindow = NULL;
  int32_t     code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
351
    ((SWindowLogicNode*)pPartWindow)->intervalAlgo = INTERVAL_ALGO_HASH;
X
Xiaoyu Wang 已提交
352 353
    ((SWindowLogicNode*)pInfo->pSplitNode)->intervalAlgo = INTERVAL_ALGO_MERGE;
    SNodeList* pMergeKeys = NULL;
354
    code = stbSplCreateMergeKeysForInterval(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk, &pMergeKeys);
X
Xiaoyu Wang 已提交
355 356 357 358 359 360
    if (TSDB_CODE_SUCCESS == code) {
      code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow);
    }
    if (TSDB_CODE_SUCCESS != code) {
      nodesDestroyList(pMergeKeys);
    }
X
Xiaoyu Wang 已提交
361 362 363
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
364
                                     (SNode*)splCreateScanSubplan(pCxt, pPartWindow, SPLIT_FLAG_STABLE_SPLIT));
X
Xiaoyu Wang 已提交
365 366 367 368 369
  }
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
  return code;
}

370
static int32_t stbSplSplitIntervalForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
371 372 373
  SLogicNode* pPartWindow = NULL;
  int32_t     code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
374 375
    ((SWindowLogicNode*)pPartWindow)->intervalAlgo = INTERVAL_ALGO_STREAM_SEMI;
    ((SWindowLogicNode*)pInfo->pSplitNode)->intervalAlgo = INTERVAL_ALGO_STREAM_FINAL;
376 377 378 379
    code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartWindow);
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
380
                                     (SNode*)splCreateScanSubplan(pCxt, pPartWindow, SPLIT_FLAG_STABLE_SPLIT));
381 382 383 384 385
  }
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
  return code;
}

386
static int32_t stbSplSplitInterval(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
387
  if (pCxt->pPlanCxt->streamQuery) {
388
    return stbSplSplitIntervalForStream(pCxt, pInfo);
389
  } else {
390
    return stbSplSplitIntervalForBatch(pCxt, pInfo);
391 392 393
  }
}

394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409
static int32_t stbSplSplitSession(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
  return TSDB_CODE_PLAN_INTERNAL_ERROR;
}

static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
  switch (((SWindowLogicNode*)pInfo->pSplitNode)->winType) {
    case WINDOW_TYPE_INTERVAL:
      return stbSplSplitInterval(pCxt, pInfo);
    case WINDOW_TYPE_SESSION:
      return stbSplSplitSession(pCxt, pInfo);
    default:
      break;
  }
  return TSDB_CODE_PLAN_INTERNAL_ERROR;
}

X
Xiaoyu Wang 已提交
410 411 412 413 414 415 416 417 418 419 420
static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pOutput) {
  SNodeList* pFunc = pMergeAgg->pAggFuncs;
  pMergeAgg->pAggFuncs = NULL;
  SNodeList* pGroupKeys = pMergeAgg->pGroupKeys;
  pMergeAgg->pGroupKeys = NULL;
  SNodeList* pTargets = pMergeAgg->node.pTargets;
  pMergeAgg->node.pTargets = NULL;
  SNodeList* pChildren = pMergeAgg->node.pChildren;
  pMergeAgg->node.pChildren = NULL;

  int32_t        code = TSDB_CODE_SUCCESS;
421
  SAggLogicNode* pPartAgg = (SAggLogicNode*)nodesCloneNode((SNode*)pMergeAgg);
X
Xiaoyu Wang 已提交
422 423 424 425 426 427
  if (NULL == pPartAgg) {
    code = TSDB_CODE_OUT_OF_MEMORY;
  }

  if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) {
    pPartAgg->pGroupKeys = pGroupKeys;
X
Xiaoyu Wang 已提交
428
    code = createColumnByRewriteExprs(pPartAgg->pGroupKeys, &pPartAgg->node.pTargets);
X
Xiaoyu Wang 已提交
429 430 431 432 433 434 435 436 437 438 439 440 441 442
  }
  if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) {
    pMergeAgg->pGroupKeys = nodesCloneList(pPartAgg->node.pTargets);
    if (NULL == pMergeAgg->pGroupKeys) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  }
  if (TSDB_CODE_SUCCESS == code) {
    pMergeAgg->node.pTargets = pTargets;
    pPartAgg->node.pChildren = pChildren;

    code = stbSplRewriteFuns(pFunc, &pPartAgg->pAggFuncs, &pMergeAgg->pAggFuncs);
  }
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
443
    code = createColumnByRewriteExprs(pPartAgg->pAggFuncs, &pPartAgg->node.pTargets);
X
Xiaoyu Wang 已提交
444 445 446 447 448 449
  }

  nodesDestroyList(pFunc);
  if (TSDB_CODE_SUCCESS == code) {
    *pOutput = (SLogicNode*)pPartAgg;
  } else {
450
    nodesDestroyNode((SNode*)pPartAgg);
X
Xiaoyu Wang 已提交
451 452 453 454 455 456 457 458 459 460 461 462 463
  }

  return code;
}

static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
  SLogicNode* pPartAgg = NULL;
  int32_t     code = stbSplCreatePartAggNode((SAggLogicNode*)pInfo->pSplitNode, &pPartAgg);
  if (TSDB_CODE_SUCCESS == code) {
    code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartAgg);
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
464
                                     (SNode*)splCreateScanSubplan(pCxt, pPartAgg, SPLIT_FLAG_STABLE_SPLIT));
X
Xiaoyu Wang 已提交
465 466 467 468 469
  }
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
  return code;
}

X
Xiaoyu Wang 已提交
470
static SNode* stbSplCreateColumnNode(SExprNode* pExpr) {
471
  SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
X
Xiaoyu Wang 已提交
472 473 474 475 476 477 478 479 480 481 482 483 484
  if (NULL == pCol) {
    return NULL;
  }
  if (QUERY_NODE_COLUMN == nodeType(pExpr)) {
    strcpy(pCol->tableAlias, ((SColumnNode*)pExpr)->tableAlias);
  }
  strcpy(pCol->colName, pExpr->aliasName);
  strcpy(pCol->node.aliasName, pExpr->aliasName);
  pCol->node.resType = pExpr->resType;
  return (SNode*)pCol;
}

static SNode* stbSplCreateOrderByExpr(SOrderByExprNode* pSortKey, SNode* pCol) {
485
  SOrderByExprNode* pOutput = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
X
Xiaoyu Wang 已提交
486 487 488 489 490
  if (NULL == pOutput) {
    return NULL;
  }
  pOutput->pExpr = nodesCloneNode(pCol);
  if (NULL == pOutput->pExpr) {
491
    nodesDestroyNode((SNode*)pOutput);
X
Xiaoyu Wang 已提交
492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542
    return NULL;
  }
  pOutput->order = pSortKey->order;
  pOutput->nullOrder = pSortKey->nullOrder;
  return (SNode*)pOutput;
}

static int32_t stbSplCreateMergeKeys(SNodeList* pSortKeys, SNodeList* pTargets, SNodeList** pOutput) {
  int32_t    code = TSDB_CODE_SUCCESS;
  SNodeList* pMergeKeys = NULL;
  SNode*     pNode = NULL;
  FOREACH(pNode, pSortKeys) {
    SOrderByExprNode* pSortKey = (SOrderByExprNode*)pNode;
    SNode*            pTarget = NULL;
    bool              found = false;
    FOREACH(pTarget, pTargets) {
      if (0 == strcmp(((SExprNode*)pSortKey->pExpr)->aliasName, ((SColumnNode*)pTarget)->colName)) {
        code = nodesListMakeStrictAppend(&pMergeKeys, stbSplCreateOrderByExpr(pSortKey, pTarget));
        if (TSDB_CODE_SUCCESS != code) {
          break;
        }
        found = true;
      }
    }
    if (TSDB_CODE_SUCCESS == code && !found) {
      SNode* pCol = stbSplCreateColumnNode((SExprNode*)pSortKey->pExpr);
      code = nodesListMakeStrictAppend(&pMergeKeys, stbSplCreateOrderByExpr(pSortKey, pCol));
      if (TSDB_CODE_SUCCESS == code) {
        code = nodesListStrictAppend(pTargets, pCol);
      } else {
        nodesDestroyNode(pCol);
      }
    }
    if (TSDB_CODE_SUCCESS != code) {
      break;
    }
  }
  if (TSDB_CODE_SUCCESS == code) {
    *pOutput = pMergeKeys;
  } else {
    nodesDestroyList(pMergeKeys);
  }
  return code;
}

static int32_t stbSplCreatePartSortNode(SSortLogicNode* pSort, SLogicNode** pOutputPartSort,
                                        SNodeList** pOutputMergeKeys) {
  SNodeList* pSortKeys = pSort->pSortKeys;
  pSort->pSortKeys = NULL;
  SNodeList* pChildren = pSort->node.pChildren;
  pSort->node.pChildren = NULL;
X
Xiaoyu Wang 已提交
543 544

  int32_t         code = TSDB_CODE_SUCCESS;
545
  SSortLogicNode* pPartSort = (SSortLogicNode*)nodesCloneNode((SNode*)pSort);
X
Xiaoyu Wang 已提交
546 547 548 549
  if (NULL == pPartSort) {
    code = TSDB_CODE_OUT_OF_MEMORY;
  }

X
Xiaoyu Wang 已提交
550
  SNodeList* pMergeKeys = NULL;
X
Xiaoyu Wang 已提交
551
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
552
    pPartSort->node.pChildren = pChildren;
X
Xiaoyu Wang 已提交
553
    pPartSort->pSortKeys = pSortKeys;
X
Xiaoyu Wang 已提交
554
    code = stbSplCreateMergeKeys(pPartSort->pSortKeys, pPartSort->node.pTargets, &pMergeKeys);
X
Xiaoyu Wang 已提交
555 556 557
  }

  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
558 559
    *pOutputPartSort = (SLogicNode*)pPartSort;
    *pOutputMergeKeys = pMergeKeys;
X
Xiaoyu Wang 已提交
560
  } else {
561
    nodesDestroyNode((SNode*)pPartSort);
X
Xiaoyu Wang 已提交
562
    nodesDestroyList(pMergeKeys);
X
Xiaoyu Wang 已提交
563 564 565 566 567 568 569
  }

  return code;
}

static int32_t stbSplSplitSortNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
  SLogicNode* pPartSort = NULL;
X
Xiaoyu Wang 已提交
570 571
  SNodeList*  pMergeKeys = NULL;
  int32_t     code = stbSplCreatePartSortNode((SSortLogicNode*)pInfo->pSplitNode, &pPartSort, &pMergeKeys);
X
Xiaoyu Wang 已提交
572
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
573
    code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, pMergeKeys, pPartSort);
X
Xiaoyu Wang 已提交
574 575 576
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
577
                                     (SNode*)splCreateScanSubplan(pCxt, pPartSort, SPLIT_FLAG_STABLE_SPLIT));
X
Xiaoyu Wang 已提交
578 579 580 581 582
  }
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
  return code;
}

X
Xiaoyu Wang 已提交
583
static int32_t stbSplSplitScanNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
584
  int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pInfo->pSplitNode, SUBPLAN_TYPE_MERGE);
X
Xiaoyu Wang 已提交
585 586
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
587
                                     (SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
X
Xiaoyu Wang 已提交
588 589 590 591
  }
  return code;
}

X
Xiaoyu Wang 已提交
592
static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
593 594 595 596
  if (pCxt->pPlanCxt->rSmaQuery) {
    return TSDB_CODE_SUCCESS;
  }

X
Xiaoyu Wang 已提交
597 598
  SStableSplitInfo info = {0};
  if (!splMatch(pCxt, pSubplan, SPLIT_FLAG_STABLE_SPLIT, (FSplFindSplitNode)stbSplFindSplitNode, &info)) {
599 600
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
601 602 603

  int32_t code = TSDB_CODE_SUCCESS;
  switch (nodeType(info.pSplitNode)) {
X
Xiaoyu Wang 已提交
604 605 606
    case QUERY_NODE_LOGIC_PLAN_AGG:
      code = stbSplSplitAggNode(pCxt, &info);
      break;
X
Xiaoyu Wang 已提交
607 608 609
    case QUERY_NODE_LOGIC_PLAN_WINDOW:
      code = stbSplSplitWindowNode(pCxt, &info);
      break;
X
Xiaoyu Wang 已提交
610 611 612
    case QUERY_NODE_LOGIC_PLAN_SORT:
      code = stbSplSplitSortNode(pCxt, &info);
      break;
X
Xiaoyu Wang 已提交
613 614 615 616 617
    case QUERY_NODE_LOGIC_PLAN_SCAN:
      code = stbSplSplitScanNode(pCxt, &info);
      break;
    default:
      break;
618
  }
X
Xiaoyu Wang 已提交
619

620 621 622 623 624
  ++(pCxt->groupId);
  pCxt->split = true;
  return code;
}

X
Xiaoyu Wang 已提交
625 626 627 628 629 630 631
typedef struct SSigTbJoinSplitInfo {
  SJoinLogicNode* pJoin;
  SLogicNode*     pSplitNode;
  SLogicSubplan*  pSubplan;
} SSigTbJoinSplitInfo;

static bool sigTbJoinSplNeedSplit(SJoinLogicNode* pJoin) {
X
Xiaoyu Wang 已提交
632 633 634 635 636
  if (!pJoin->isSingleTableJoin) {
    return false;
  }
  return QUERY_NODE_LOGIC_PLAN_EXCHANGE != nodeType(nodesListGetNode(pJoin->node.pChildren, 0)) &&
         QUERY_NODE_LOGIC_PLAN_EXCHANGE != nodeType(nodesListGetNode(pJoin->node.pChildren, 1));
637 638
}

X
Xiaoyu Wang 已提交
639 640
static SJoinLogicNode* sigTbJoinSplMatchByNode(SLogicNode* pNode) {
  if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pNode) && sigTbJoinSplNeedSplit((SJoinLogicNode*)pNode)) {
X
Xiaoyu Wang 已提交
641
    return (SJoinLogicNode*)pNode;
642 643 644
  }
  SNode* pChild;
  FOREACH(pChild, pNode->pChildren) {
X
Xiaoyu Wang 已提交
645
    SJoinLogicNode* pSplitNode = sigTbJoinSplMatchByNode((SLogicNode*)pChild);
646 647 648 649 650 651 652
    if (NULL != pSplitNode) {
      return pSplitNode;
    }
  }
  return NULL;
}

653
static bool sigTbJoinSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SSigTbJoinSplitInfo* pInfo) {
X
Xiaoyu Wang 已提交
654
  SJoinLogicNode* pJoin = sigTbJoinSplMatchByNode(pSubplan->pNode);
X
Xiaoyu Wang 已提交
655 656
  if (NULL != pJoin) {
    pInfo->pJoin = pJoin;
657
    pInfo->pSplitNode = (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1);
658 659
    pInfo->pSubplan = pSubplan;
  }
X
Xiaoyu Wang 已提交
660
  return NULL != pJoin;
661 662
}

X
Xiaoyu Wang 已提交
663 664 665
static int32_t singleTableJoinSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
  SSigTbJoinSplitInfo info = {0};
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)sigTbJoinSplFindSplitNode, &info)) {
X
Xiaoyu Wang 已提交
666 667
    return TSDB_CODE_SUCCESS;
  }
668
  int32_t code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pSplitNode, info.pSubplan->subplanType);
X
Xiaoyu Wang 已提交
669
  if (TSDB_CODE_SUCCESS == code) {
670
    code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, info.pSplitNode, 0));
X
Xiaoyu Wang 已提交
671 672 673 674 675 676
  }
  ++(pCxt->groupId);
  pCxt->split = true;
  return code;
}

677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710
static bool unionIsChildSubplan(SLogicNode* pLogicNode, int32_t groupId) {
  if (QUERY_NODE_LOGIC_PLAN_EXCHANGE == nodeType(pLogicNode)) {
    return ((SExchangeLogicNode*)pLogicNode)->srcGroupId == groupId;
  }

  SNode* pChild;
  FOREACH(pChild, pLogicNode->pChildren) {
    bool isChild = unionIsChildSubplan((SLogicNode*)pChild, groupId);
    if (isChild) {
      return isChild;
    }
  }
  return false;
}

static int32_t unionMountSubplan(SLogicSubplan* pParent, SNodeList* pChildren) {
  SNode* pChild = NULL;
  WHERE_EACH(pChild, pChildren) {
    if (unionIsChildSubplan(pParent->pNode, ((SLogicSubplan*)pChild)->id.groupId)) {
      int32_t code = nodesListMakeAppend(&pParent->pChildren, pChild);
      if (TSDB_CODE_SUCCESS == code) {
        REPLACE_NODE(NULL);
        ERASE_NODE(pChildren);
        continue;
      } else {
        return code;
      }
    }
    WHERE_NEXT;
  }
  return TSDB_CODE_SUCCESS;
}

static SLogicSubplan* unionCreateSubplan(SSplitContext* pCxt, SLogicNode* pNode) {
711
  SLogicSubplan* pSubplan = (SLogicSubplan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
712 713 714
  if (NULL == pSubplan) {
    return NULL;
  }
715
  pSubplan->id.queryId = pCxt->queryId;
716 717 718
  pSubplan->id.groupId = pCxt->groupId;
  pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
  pSubplan->pNode = pNode;
719
  pNode->pParent = NULL;
720 721 722 723 724 725 726 727 728 729 730 731
  return pSubplan;
}

static int32_t unionSplitSubplan(SSplitContext* pCxt, SLogicSubplan* pUnionSubplan, SLogicNode* pSplitNode) {
  SNodeList* pSubplanChildren = pUnionSubplan->pChildren;
  pUnionSubplan->pChildren = NULL;

  int32_t code = TSDB_CODE_SUCCESS;

  SNode* pChild = NULL;
  FOREACH(pChild, pSplitNode->pChildren) {
    SLogicSubplan* pNewSubplan = unionCreateSubplan(pCxt, (SLogicNode*)pChild);
732
    code = nodesListMakeStrictAppend(&pUnionSubplan->pChildren, (SNode*)pNewSubplan);
733 734 735 736 737 738 739 740 741 742 743 744 745 746 747
    if (TSDB_CODE_SUCCESS == code) {
      REPLACE_NODE(NULL);
      code = unionMountSubplan(pNewSubplan, pSubplanChildren);
    }
    if (TSDB_CODE_SUCCESS != code) {
      break;
    }
  }
  if (TSDB_CODE_SUCCESS == code) {
    nodesDestroyList(pSubplanChildren);
    DESTORY_LIST(pSplitNode->pChildren);
  }
  return code;
}

X
Xiaoyu Wang 已提交
748 749 750 751 752
typedef struct SUnionAllSplitInfo {
  SProjectLogicNode* pProject;
  SLogicSubplan*     pSubplan;
} SUnionAllSplitInfo;

X
Xiaoyu Wang 已提交
753
static SLogicNode* unAllSplMatchByNode(SLogicNode* pNode) {
X
Xiaoyu Wang 已提交
754 755 756 757 758
  if (QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
    return pNode;
  }
  SNode* pChild;
  FOREACH(pChild, pNode->pChildren) {
X
Xiaoyu Wang 已提交
759
    SLogicNode* pSplitNode = unAllSplMatchByNode((SLogicNode*)pChild);
X
Xiaoyu Wang 已提交
760 761 762 763 764 765 766
    if (NULL != pSplitNode) {
      return pSplitNode;
    }
  }
  return NULL;
}

767
static bool unAllSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SUnionAllSplitInfo* pInfo) {
X
Xiaoyu Wang 已提交
768
  SLogicNode* pSplitNode = unAllSplMatchByNode(pSubplan->pNode);
X
Xiaoyu Wang 已提交
769 770 771 772 773 774 775
  if (NULL != pSplitNode) {
    pInfo->pProject = (SProjectLogicNode*)pSplitNode;
    pInfo->pSubplan = pSubplan;
  }
  return NULL != pSplitNode;
}

X
Xiaoyu Wang 已提交
776
static int32_t unAllSplCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SProjectLogicNode* pProject) {
777
  SExchangeLogicNode* pExchange = (SExchangeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE);
X
Xiaoyu Wang 已提交
778 779 780 781
  if (NULL == pExchange) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  pExchange->srcGroupId = pCxt->groupId;
X
Xiaoyu Wang 已提交
782
  pExchange->node.precision = pProject->node.precision;
X
Xiaoyu Wang 已提交
783 784 785 786 787 788 789
  pExchange->node.pTargets = nodesCloneList(pProject->node.pTargets);
  if (NULL == pExchange->node.pTargets) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  pSubplan->subplanType = SUBPLAN_TYPE_MERGE;

X
Xiaoyu Wang 已提交
790 791
  if (NULL == pProject->node.pParent) {
    pSubplan->pNode = (SLogicNode*)pExchange;
792
    nodesDestroyNode((SNode*)pProject);
X
Xiaoyu Wang 已提交
793 794 795 796 797
    return TSDB_CODE_SUCCESS;
  }

  SNode* pNode;
  FOREACH(pNode, pProject->node.pParent->pChildren) {
798
    if (nodesEqualNode(pNode, (SNode*)pProject)) {
X
Xiaoyu Wang 已提交
799 800 801 802 803
      REPLACE_NODE(pExchange);
      nodesDestroyNode(pNode);
      return TSDB_CODE_SUCCESS;
    }
  }
804 805
  nodesDestroyNode((SNode*)pExchange);
  return TSDB_CODE_PLAN_INTERNAL_ERROR;
X
Xiaoyu Wang 已提交
806 807
}

X
Xiaoyu Wang 已提交
808 809
static int32_t unionAllSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
  SUnionAllSplitInfo info = {0};
X
Xiaoyu Wang 已提交
810
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)unAllSplFindSplitNode, &info)) {
811 812
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
813

814
  int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pProject);
X
Xiaoyu Wang 已提交
815
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
816
    code = unAllSplCreateExchangeNode(pCxt, info.pSubplan, info.pProject);
X
Xiaoyu Wang 已提交
817 818
  }
  ++(pCxt->groupId);
819
  pCxt->split = true;
X
Xiaoyu Wang 已提交
820 821 822
  return code;
}

X
Xiaoyu Wang 已提交
823 824 825 826 827
typedef struct SUnionDistinctSplitInfo {
  SAggLogicNode* pAgg;
  SLogicSubplan* pSubplan;
} SUnionDistinctSplitInfo;

X
Xiaoyu Wang 已提交
828
static SLogicNode* unDistSplMatchByNode(SLogicNode* pNode) {
X
Xiaoyu Wang 已提交
829 830 831 832 833
  if (QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
    return pNode;
  }
  SNode* pChild;
  FOREACH(pChild, pNode->pChildren) {
X
Xiaoyu Wang 已提交
834
    SLogicNode* pSplitNode = unDistSplMatchByNode((SLogicNode*)pChild);
X
Xiaoyu Wang 已提交
835 836 837 838 839 840 841
    if (NULL != pSplitNode) {
      return pSplitNode;
    }
  }
  return NULL;
}

X
Xiaoyu Wang 已提交
842
static int32_t unDistSplCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SAggLogicNode* pAgg) {
843
  SExchangeLogicNode* pExchange = (SExchangeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE);
X
Xiaoyu Wang 已提交
844 845 846 847
  if (NULL == pExchange) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  pExchange->srcGroupId = pCxt->groupId;
X
Xiaoyu Wang 已提交
848
  pExchange->node.precision = pAgg->node.precision;
X
Xiaoyu Wang 已提交
849
  pExchange->node.pTargets = nodesCloneList(pAgg->pGroupKeys);
X
Xiaoyu Wang 已提交
850 851 852 853 854 855
  if (NULL == pExchange->node.pTargets) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  pSubplan->subplanType = SUBPLAN_TYPE_MERGE;

856
  return nodesListMakeAppend(&pAgg->node.pChildren, (SNode*)pExchange);
X
Xiaoyu Wang 已提交
857 858
}

859
static bool unDistSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SUnionDistinctSplitInfo* pInfo) {
X
Xiaoyu Wang 已提交
860
  SLogicNode* pSplitNode = unDistSplMatchByNode(pSubplan->pNode);
X
Xiaoyu Wang 已提交
861 862 863 864 865 866 867
  if (NULL != pSplitNode) {
    pInfo->pAgg = (SAggLogicNode*)pSplitNode;
    pInfo->pSubplan = pSubplan;
  }
  return NULL != pSplitNode;
}

X
Xiaoyu Wang 已提交
868 869
static int32_t unionDistinctSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
  SUnionDistinctSplitInfo info = {0};
X
Xiaoyu Wang 已提交
870
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)unDistSplFindSplitNode, &info)) {
X
Xiaoyu Wang 已提交
871 872 873
    return TSDB_CODE_SUCCESS;
  }

874
  int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pAgg);
X
Xiaoyu Wang 已提交
875
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
876
    code = unDistSplCreateExchangeNode(pCxt, info.pSubplan, info.pAgg);
X
Xiaoyu Wang 已提交
877 878 879 880 881 882
  }
  ++(pCxt->groupId);
  pCxt->split = true;
  return code;
}

X
Xiaoyu Wang 已提交
883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925
typedef struct SSmaIndexSplitInfo {
  SMergeLogicNode* pMerge;
  SLogicSubplan*   pSubplan;
} SSmaIndexSplitInfo;

static SLogicNode* smaIdxSplMatchByNode(SLogicNode* pNode) {
  if (QUERY_NODE_LOGIC_PLAN_MERGE == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
    return pNode;
  }
  SNode* pChild;
  FOREACH(pChild, pNode->pChildren) {
    SLogicNode* pSplitNode = smaIdxSplMatchByNode((SLogicNode*)pChild);
    if (NULL != pSplitNode) {
      return pSplitNode;
    }
  }
  return NULL;
}

static bool smaIdxSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SSmaIndexSplitInfo* pInfo) {
  SLogicNode* pSplitNode = smaIdxSplMatchByNode(pSubplan->pNode);
  if (NULL != pSplitNode) {
    pInfo->pMerge = (SMergeLogicNode*)pSplitNode;
    pInfo->pSubplan = pSubplan;
  }
  return NULL != pSplitNode;
}

static int32_t smaIndexSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
  SSmaIndexSplitInfo info = {0};
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)smaIdxSplFindSplitNode, &info)) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pMerge);
  if (TSDB_CODE_SUCCESS == code) {
    info.pMerge->srcGroupId = pCxt->groupId;
  }
  ++(pCxt->groupId);
  pCxt->split = true;
  return code;
}

X
Xiaoyu Wang 已提交
926 927 928 929 930
// clang-format off
static const SSplitRule splitRuleSet[] = {
  {.pName = "SuperTableSplit",      .splitFunc = stableSplit},
  {.pName = "SingleTableJoinSplit", .splitFunc = singleTableJoinSplit},
  {.pName = "UnionAllSplit",        .splitFunc = unionAllSplit},
X
Xiaoyu Wang 已提交
931 932
  {.pName = "UnionDistinctSplit",   .splitFunc = unionDistinctSplit},
  {.pName = "SmaIndexSplit",        .splitFunc = smaIndexSplit}
X
Xiaoyu Wang 已提交
933 934
};
// clang-format on
X
Xiaoyu Wang 已提交
935 936 937

static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule));

938 939
static void dumpLogicSubplan(const char* pRuleName, SLogicSubplan* pSubplan) {
  char* pStr = NULL;
940
  nodesNodeToString((SNode*)pSubplan, false, &pStr, NULL);
941 942 943 944
  qDebugL("apply %s rule: %s", pRuleName, pStr);
  taosMemoryFree(pStr);
}

945 946 947 948
static int32_t applySplitRule(SPlanContext* pCxt, SLogicSubplan* pSubplan) {
  SSplitContext cxt = {
      .pPlanCxt = pCxt, .queryId = pSubplan->id.queryId, .groupId = pSubplan->id.groupId + 1, .split = false};
  bool split = false;
X
Xiaoyu Wang 已提交
949
  do {
950
    split = false;
X
Xiaoyu Wang 已提交
951
    for (int32_t i = 0; i < splitRuleNum; ++i) {
952
      cxt.split = false;
953
      int32_t code = splitRuleSet[i].splitFunc(&cxt, pSubplan);
X
Xiaoyu Wang 已提交
954 955 956
      if (TSDB_CODE_SUCCESS != code) {
        return code;
      }
957 958 959 960
      if (cxt.split) {
        split = true;
        dumpLogicSubplan(splitRuleSet[i].pName, pSubplan);
      }
X
Xiaoyu Wang 已提交
961
    }
962
  } while (split);
X
Xiaoyu Wang 已提交
963 964
  return TSDB_CODE_SUCCESS;
}
X
Xiaoyu Wang 已提交
965

X
Xiaoyu Wang 已提交
966 967 968 969 970 971 972 973 974 975
static void setVgroupsInfo(SLogicNode* pNode, SLogicSubplan* pSubplan) {
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
    TSWAP(((SScanLogicNode*)pNode)->pVgroupList, pSubplan->pVgroupList);
    return;
  }

  SNode* pChild;
  FOREACH(pChild, pNode->pChildren) { setVgroupsInfo((SLogicNode*)pChild, pSubplan); }
}

X
Xiaoyu Wang 已提交
976 977 978 979
int32_t splitLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) {
  if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY == nodeType(pLogicSubplan->pNode)) {
    setVgroupsInfo(pLogicSubplan->pNode, pLogicSubplan);
    return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
980
  }
X
Xiaoyu Wang 已提交
981 982
  return applySplitRule(pCxt, pLogicSubplan);
}