planSpliter.c 32.7 KB
Newer Older
X
Xiaoyu Wang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

X
Xiaoyu Wang 已提交
16
#include "functionMgt.h"
X
Xiaoyu Wang 已提交
17
#include "planInt.h"
X
Xiaoyu Wang 已提交
18

X
Xiaoyu Wang 已提交
19
#define SPLIT_FLAG_MASK(n) (1 << n)
X
Xiaoyu Wang 已提交
20

X
Xiaoyu Wang 已提交
21
#define SPLIT_FLAG_STABLE_SPLIT SPLIT_FLAG_MASK(0)
X
Xiaoyu Wang 已提交
22

X
Xiaoyu Wang 已提交
23
#define SPLIT_FLAG_SET_MASK(val, mask)  (val) |= (mask)
X
Xiaoyu Wang 已提交
24 25 26
#define SPLIT_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0)

typedef struct SSplitContext {
27 28 29 30
  SPlanContext* pPlanCxt;
  uint64_t      queryId;
  int32_t       groupId;
  bool          split;
X
Xiaoyu Wang 已提交
31 32
} SSplitContext;

33
typedef int32_t (*FSplit)(SSplitContext* pCxt, SLogicSubplan* pSubplan);
X
Xiaoyu Wang 已提交
34 35

typedef struct SSplitRule {
X
Xiaoyu Wang 已提交
36
  char*  pName;
X
Xiaoyu Wang 已提交
37 38 39
  FSplit splitFunc;
} SSplitRule;

40
typedef bool (*FSplFindSplitNode)(SSplitContext* pCxt, SLogicSubplan* pSubplan, void* pInfo);
X
Xiaoyu Wang 已提交
41

X
Xiaoyu Wang 已提交
42 43 44 45 46 47 48 49 50 51 52
static void splSetSubplanVgroups(SLogicSubplan* pSubplan, SLogicNode* pNode) {
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
    TSWAP(pSubplan->pVgroupList, ((SScanLogicNode*)pNode)->pVgroupList);
  } else {
    if (1 == LIST_LENGTH(pNode->pChildren)) {
      splSetSubplanVgroups(pSubplan, (SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
    }
  }
}

static SLogicSubplan* splCreateScanSubplan(SSplitContext* pCxt, SLogicNode* pNode, int32_t flag) {
53
  SLogicSubplan* pSubplan = (SLogicSubplan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
X
Xiaoyu Wang 已提交
54 55 56
  if (NULL == pSubplan) {
    return NULL;
  }
57
  pSubplan->id.queryId = pCxt->queryId;
X
Xiaoyu Wang 已提交
58 59
  pSubplan->id.groupId = pCxt->groupId;
  pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
X
Xiaoyu Wang 已提交
60 61 62
  pSubplan->pNode = pNode;
  pSubplan->pNode->pParent = NULL;
  splSetSubplanVgroups(pSubplan, pNode);
63
  SPLIT_FLAG_SET_MASK(pSubplan->splitFlag, flag);
X
Xiaoyu Wang 已提交
64 65 66
  return pSubplan;
}

67
static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pChild, SExchangeLogicNode** pOutput) {
68
  SExchangeLogicNode* pExchange = (SExchangeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE);
X
Xiaoyu Wang 已提交
69 70 71 72
  if (NULL == pExchange) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  pExchange->srcGroupId = pCxt->groupId;
73 74
  pExchange->node.precision = pChild->precision;
  pExchange->node.pTargets = nodesCloneList(pChild->pTargets);
X
Xiaoyu Wang 已提交
75 76 77 78
  if (NULL == pExchange->node.pTargets) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

79 80 81 82
  *pOutput = pExchange;
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
83 84 85 86 87
static int32_t splCreateExchangeNodeForSubplan(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode,
                                               ESubplanType subplanType) {
  SExchangeLogicNode* pExchange = NULL;
  int32_t             code = splCreateExchangeNode(pCxt, pSplitNode, &pExchange);
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
88
    code = replaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pExchange);
X
Xiaoyu Wang 已提交
89 90 91 92
  }
  if (TSDB_CODE_SUCCESS == code) {
    pSubplan->subplanType = subplanType;
  } else {
93
    nodesDestroyNode((SNode*)pExchange);
X
Xiaoyu Wang 已提交
94 95
  }
  return code;
X
Xiaoyu Wang 已提交
96 97
}

98 99
static bool splMatch(SSplitContext* pCxt, SLogicSubplan* pSubplan, int32_t flag, FSplFindSplitNode func, void* pInfo) {
  if (!SPLIT_FLAG_TEST_MASK(pSubplan->splitFlag, flag)) {
100
    if (func(pCxt, pSubplan, pInfo)) {
101 102 103 104 105 106 107 108 109 110 111 112
      return true;
    }
  }
  SNode* pChild;
  FOREACH(pChild, pSubplan->pChildren) {
    if (splMatch(pCxt, (SLogicSubplan*)pChild, flag, func, pInfo)) {
      return true;
    }
  }
  return false;
}

X
Xiaoyu Wang 已提交
113
typedef struct SStableSplitInfo {
X
Xiaoyu Wang 已提交
114 115
  SLogicNode*    pSplitNode;
  SLogicSubplan* pSubplan;
X
Xiaoyu Wang 已提交
116 117
} SStableSplitInfo;

X
Xiaoyu Wang 已提交
118 119 120
static bool stbSplHasGatherExecFunc(const SNodeList* pFuncs) {
  SNode* pFunc = NULL;
  FOREACH(pFunc, pFuncs) {
121 122
    if (!fmIsWindowPseudoColumnFunc(((SFunctionNode*)pFunc)->funcId) &&
        !fmIsDistExecFunc(((SFunctionNode*)pFunc)->funcId)) {
X
Xiaoyu Wang 已提交
123 124 125 126 127 128
      return true;
    }
  }
  return false;
}

129 130
static bool stbSplIsMultiTbScan(bool streamQuery, SScanLogicNode* pScan) {
  return (NULL != pScan->pVgroupList && pScan->pVgroupList->numOfVgroups > 1) ||
X
Xiaoyu Wang 已提交
131
         (streamQuery && TSDB_SUPER_TABLE == pScan->tableType);
X
Xiaoyu Wang 已提交
132 133
}

134
static bool stbSplHasMultiTbScan(bool streamQuery, SLogicNode* pNode) {
X
Xiaoyu Wang 已提交
135 136 137 138
  if (1 != LIST_LENGTH(pNode->pChildren)) {
    return false;
  }
  SNode* pChild = nodesListGetNode(pNode->pChildren, 0);
139
  return (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pChild));
X
Xiaoyu Wang 已提交
140 141
}

142
static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
X
Xiaoyu Wang 已提交
143
  switch (nodeType(pNode)) {
X
Xiaoyu Wang 已提交
144 145
    case QUERY_NODE_LOGIC_PLAN_AGG:
      return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
146 147 148 149 150 151 152
    case QUERY_NODE_LOGIC_PLAN_WINDOW: {
      SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode;
      if (WINDOW_TYPE_INTERVAL != pWindow->winType) {
        return false;
      }
      return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
    }
153 154
    case QUERY_NODE_LOGIC_PLAN_SORT:
      return stbSplHasMultiTbScan(streamQuery, pNode);
X
Xiaoyu Wang 已提交
155
    case QUERY_NODE_LOGIC_PLAN_SCAN:
156
      return stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode);
X
Xiaoyu Wang 已提交
157 158 159 160 161 162
    default:
      break;
  }
  return false;
}

163 164
static SLogicNode* stbSplMatchByNode(bool streamQuery, SLogicNode* pNode) {
  if (stbSplNeedSplit(streamQuery, pNode)) {
165 166 167 168
    return pNode;
  }
  SNode* pChild;
  FOREACH(pChild, pNode->pChildren) {
169
    SLogicNode* pSplitNode = stbSplMatchByNode(streamQuery, (SLogicNode*)pChild);
170 171 172 173 174 175 176
    if (NULL != pSplitNode) {
      return pSplitNode;
    }
  }
  return NULL;
}

177 178
static bool stbSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SStableSplitInfo* pInfo) {
  SLogicNode* pSplitNode = stbSplMatchByNode(pCxt->pPlanCxt->streamQuery, pSubplan->pNode);
179
  if (NULL != pSplitNode) {
X
Xiaoyu Wang 已提交
180
    pInfo->pSplitNode = pSplitNode;
181 182 183 184 185
    pInfo->pSubplan = pSubplan;
  }
  return NULL != pSplitNode;
}

X
Xiaoyu Wang 已提交
186 187 188 189 190 191 192 193
static int32_t stbSplRewriteFuns(const SNodeList* pFuncs, SNodeList** pPartialFuncs, SNodeList** pMergeFuncs) {
  SNode* pNode = NULL;
  FOREACH(pNode, pFuncs) {
    SFunctionNode* pFunc = (SFunctionNode*)pNode;
    SFunctionNode* pPartFunc = NULL;
    SFunctionNode* pMergeFunc = NULL;
    int32_t        code = TSDB_CODE_SUCCESS;
    if (fmIsWindowPseudoColumnFunc(pFunc->funcId)) {
194 195
      pPartFunc = (SFunctionNode*)nodesCloneNode(pNode);
      pMergeFunc = (SFunctionNode*)nodesCloneNode(pNode);
X
Xiaoyu Wang 已提交
196
      if (NULL == pPartFunc || NULL == pMergeFunc) {
197 198
        nodesDestroyNode((SNode*)pPartFunc);
        nodesDestroyNode((SNode*)pMergeFunc);
X
Xiaoyu Wang 已提交
199 200 201 202 203 204
        code = TSDB_CODE_OUT_OF_MEMORY;
      }
    } else {
      code = fmGetDistMethod(pFunc, &pPartFunc, &pMergeFunc);
    }
    if (TSDB_CODE_SUCCESS == code) {
205
      code = nodesListMakeStrictAppend(pPartialFuncs, (SNode*)pPartFunc);
X
Xiaoyu Wang 已提交
206 207
    }
    if (TSDB_CODE_SUCCESS == code) {
208
      code = nodesListMakeStrictAppend(pMergeFuncs, (SNode*)pMergeFunc);
X
Xiaoyu Wang 已提交
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229
    }
    if (TSDB_CODE_SUCCESS != code) {
      nodesDestroyList(*pPartialFuncs);
      nodesDestroyList(*pMergeFuncs);
      return code;
    }
  }
  return TSDB_CODE_SUCCESS;
}

static int32_t stbSplAppendWStart(SNodeList* pFuncs, int32_t* pIndex) {
  int32_t index = 0;
  SNode*  pFunc = NULL;
  FOREACH(pFunc, pFuncs) {
    if (FUNCTION_TYPE_WSTARTTS == ((SFunctionNode*)pFunc)->funcType) {
      *pIndex = index;
      return TSDB_CODE_SUCCESS;
    }
    ++index;
  }

230
  SFunctionNode* pWStart = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
X
Xiaoyu Wang 已提交
231 232 233 234 235 236 237
  if (NULL == pWStart) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  strcpy(pWStart->functionName, "_wstartts");
  snprintf(pWStart->node.aliasName, sizeof(pWStart->node.aliasName), "%s.%p", pWStart->functionName, pWStart);
  int32_t code = fmGetFuncInfo(pWStart, NULL, 0);
  if (TSDB_CODE_SUCCESS == code) {
238
    code = nodesListStrictAppend(pFuncs, (SNode*)pWStart);
X
Xiaoyu Wang 已提交
239 240 241 242 243 244 245 246 247 248 249 250 251 252
  }
  *pIndex = index;
  return code;
}

static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogicNode** pPartWindow) {
  SNodeList* pFunc = pMergeWindow->pFuncs;
  pMergeWindow->pFuncs = NULL;
  SNodeList* pTargets = pMergeWindow->node.pTargets;
  pMergeWindow->node.pTargets = NULL;
  SNodeList* pChildren = pMergeWindow->node.pChildren;
  pMergeWindow->node.pChildren = NULL;

  int32_t           code = TSDB_CODE_SUCCESS;
253
  SWindowLogicNode* pPartWin = (SWindowLogicNode*)nodesCloneNode((SNode*)pMergeWindow);
X
Xiaoyu Wang 已提交
254 255 256 257 258 259 260 261 262 263 264 265 266 267
  if (NULL == pPartWin) {
    code = TSDB_CODE_OUT_OF_MEMORY;
  }

  if (TSDB_CODE_SUCCESS == code) {
    pMergeWindow->node.pTargets = pTargets;
    pPartWin->node.pChildren = pChildren;
    code = stbSplRewriteFuns(pFunc, &pPartWin->pFuncs, &pMergeWindow->pFuncs);
  }
  int32_t index = 0;
  if (TSDB_CODE_SUCCESS == code) {
    code = stbSplAppendWStart(pPartWin->pFuncs, &index);
  }
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
268
    code = createColumnByRewriteExprs(pPartWin->pFuncs, &pPartWin->node.pTargets);
X
Xiaoyu Wang 已提交
269 270 271 272 273 274 275 276 277 278 279 280 281
  }
  if (TSDB_CODE_SUCCESS == code) {
    nodesDestroyNode(pMergeWindow->pTspk);
    pMergeWindow->pTspk = nodesCloneNode(nodesListGetNode(pPartWin->node.pTargets, index));
    if (NULL == pMergeWindow->pTspk) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  }

  nodesDestroyList(pFunc);
  if (TSDB_CODE_SUCCESS == code) {
    *pPartWindow = (SLogicNode*)pPartWin;
  } else {
282
    nodesDestroyNode((SNode*)pPartWin);
X
Xiaoyu Wang 已提交
283 284 285 286 287
  }

  return code;
}

X
Xiaoyu Wang 已提交
288 289
static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode,
                                     SNodeList* pMergeKeys, SLogicNode* pPartChild) {
290
  SMergeLogicNode* pMerge = (SMergeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE);
X
Xiaoyu Wang 已提交
291 292 293 294 295 296
  if (NULL == pMerge) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  pMerge->numOfChannels = ((SScanLogicNode*)nodesListGetNode(pPartChild->pChildren, 0))->pVgroupList->numOfVgroups;
  pMerge->srcGroupId = pCxt->groupId;
  pMerge->node.precision = pPartChild->precision;
X
Xiaoyu Wang 已提交
297
  pMerge->pMergeKeys = pMergeKeys;
X
Xiaoyu Wang 已提交
298 299 300

  int32_t code = TSDB_CODE_SUCCESS;
  pMerge->pInputs = nodesCloneList(pPartChild->pTargets);
301 302 303 304 305 306
  // NULL == pSubplan means 'merge node' replaces 'split node'.
  if (NULL == pSubplan) {
    pMerge->node.pTargets = nodesCloneList(pPartChild->pTargets);
  } else {
    pMerge->node.pTargets = nodesCloneList(pSplitNode->pTargets);
  }
X
Xiaoyu Wang 已提交
307 308 309 310 311
  if (NULL == pMerge->node.pTargets || NULL == pMerge->pInputs) {
    code = TSDB_CODE_OUT_OF_MEMORY;
  }
  if (TSDB_CODE_SUCCESS == code) {
    if (NULL == pSubplan) {
312
      code = nodesListMakeAppend(&pSplitNode->pChildren, (SNode*)pMerge);
X
Xiaoyu Wang 已提交
313
    } else {
X
Xiaoyu Wang 已提交
314
      code = replaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pMerge);
X
Xiaoyu Wang 已提交
315 316 317
    }
  }
  if (TSDB_CODE_SUCCESS != code) {
318
    nodesDestroyNode((SNode*)pMerge);
X
Xiaoyu Wang 已提交
319
  }
X
Xiaoyu Wang 已提交
320
  return code;
X
Xiaoyu Wang 已提交
321 322
}

X
Xiaoyu Wang 已提交
323 324 325 326
static int32_t stbSplCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pParent, SLogicNode* pPartChild) {
  SExchangeLogicNode* pExchange = NULL;
  int32_t             code = splCreateExchangeNode(pCxt, pPartChild, &pExchange);
  if (TSDB_CODE_SUCCESS == code) {
327
    code = nodesListMakeAppend(&pParent->pChildren, (SNode*)pExchange);
X
Xiaoyu Wang 已提交
328 329 330 331
  }
  return code;
}

332
static int32_t stbSplCreateMergeKeysForInterval(SNode* pWStartTs, SNodeList** pMergeKeys) {
333
  SOrderByExprNode* pMergeKey = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
334 335 336 337 338
  if (NULL == pMergeKey) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  pMergeKey->pExpr = nodesCloneNode(pWStartTs);
  if (NULL == pMergeKey->pExpr) {
339
    nodesDestroyNode((SNode*)pMergeKey);
340 341 342 343
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  pMergeKey->order = ORDER_ASC;
  pMergeKey->nullOrder = NULL_ORDER_FIRST;
344
  return nodesListMakeStrictAppend(pMergeKeys, (SNode*)pMergeKey);
345 346
}

347
static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
X
Xiaoyu Wang 已提交
348 349 350
  SLogicNode* pPartWindow = NULL;
  int32_t     code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
351
    ((SWindowLogicNode*)pPartWindow)->intervalAlgo = INTERVAL_ALGO_HASH;
X
Xiaoyu Wang 已提交
352 353
    ((SWindowLogicNode*)pInfo->pSplitNode)->intervalAlgo = INTERVAL_ALGO_MERGE;
    SNodeList* pMergeKeys = NULL;
354
    code = stbSplCreateMergeKeysForInterval(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk, &pMergeKeys);
X
Xiaoyu Wang 已提交
355 356 357 358 359 360
    if (TSDB_CODE_SUCCESS == code) {
      code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow);
    }
    if (TSDB_CODE_SUCCESS != code) {
      nodesDestroyList(pMergeKeys);
    }
X
Xiaoyu Wang 已提交
361 362 363
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
364
                                     (SNode*)splCreateScanSubplan(pCxt, pPartWindow, SPLIT_FLAG_STABLE_SPLIT));
X
Xiaoyu Wang 已提交
365 366 367 368 369
  }
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
  return code;
}

370
static int32_t stbSplSplitIntervalForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
371 372 373
  SLogicNode* pPartWindow = NULL;
  int32_t     code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
374 375
    ((SWindowLogicNode*)pPartWindow)->intervalAlgo = INTERVAL_ALGO_STREAM_SEMI;
    ((SWindowLogicNode*)pInfo->pSplitNode)->intervalAlgo = INTERVAL_ALGO_STREAM_FINAL;
376 377 378 379
    code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartWindow);
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
380
                                     (SNode*)splCreateScanSubplan(pCxt, pPartWindow, SPLIT_FLAG_STABLE_SPLIT));
381 382 383 384 385
  }
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
  return code;
}

386
static int32_t stbSplSplitInterval(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
387
  if (pCxt->pPlanCxt->streamQuery) {
388
    return stbSplSplitIntervalForStream(pCxt, pInfo);
389
  } else {
390
    return stbSplSplitIntervalForBatch(pCxt, pInfo);
391 392 393
  }
}

394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409
static int32_t stbSplSplitSession(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
  return TSDB_CODE_PLAN_INTERNAL_ERROR;
}

static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
  switch (((SWindowLogicNode*)pInfo->pSplitNode)->winType) {
    case WINDOW_TYPE_INTERVAL:
      return stbSplSplitInterval(pCxt, pInfo);
    case WINDOW_TYPE_SESSION:
      return stbSplSplitSession(pCxt, pInfo);
    default:
      break;
  }
  return TSDB_CODE_PLAN_INTERNAL_ERROR;
}

X
Xiaoyu Wang 已提交
410 411 412 413 414 415 416 417 418
static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pOutput) {
  SNodeList* pFunc = pMergeAgg->pAggFuncs;
  pMergeAgg->pAggFuncs = NULL;
  SNodeList* pGroupKeys = pMergeAgg->pGroupKeys;
  pMergeAgg->pGroupKeys = NULL;
  SNodeList* pTargets = pMergeAgg->node.pTargets;
  pMergeAgg->node.pTargets = NULL;
  SNodeList* pChildren = pMergeAgg->node.pChildren;
  pMergeAgg->node.pChildren = NULL;
X
Xiaoyu Wang 已提交
419 420
  SNode* pConditions = pMergeAgg->node.pConditions;
  pMergeAgg->node.pConditions = NULL;
X
Xiaoyu Wang 已提交
421 422

  int32_t        code = TSDB_CODE_SUCCESS;
423
  SAggLogicNode* pPartAgg = (SAggLogicNode*)nodesCloneNode((SNode*)pMergeAgg);
X
Xiaoyu Wang 已提交
424 425 426 427 428 429
  if (NULL == pPartAgg) {
    code = TSDB_CODE_OUT_OF_MEMORY;
  }

  if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) {
    pPartAgg->pGroupKeys = pGroupKeys;
X
Xiaoyu Wang 已提交
430
    code = createColumnByRewriteExprs(pPartAgg->pGroupKeys, &pPartAgg->node.pTargets);
X
Xiaoyu Wang 已提交
431 432 433 434 435 436 437 438
  }
  if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) {
    pMergeAgg->pGroupKeys = nodesCloneList(pPartAgg->node.pTargets);
    if (NULL == pMergeAgg->pGroupKeys) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  }
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
439
    pMergeAgg->node.pConditions = pConditions;
X
Xiaoyu Wang 已提交
440 441 442 443 444 445
    pMergeAgg->node.pTargets = pTargets;
    pPartAgg->node.pChildren = pChildren;

    code = stbSplRewriteFuns(pFunc, &pPartAgg->pAggFuncs, &pMergeAgg->pAggFuncs);
  }
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
446
    code = createColumnByRewriteExprs(pPartAgg->pAggFuncs, &pPartAgg->node.pTargets);
X
Xiaoyu Wang 已提交
447 448 449 450 451 452
  }

  nodesDestroyList(pFunc);
  if (TSDB_CODE_SUCCESS == code) {
    *pOutput = (SLogicNode*)pPartAgg;
  } else {
453
    nodesDestroyNode((SNode*)pPartAgg);
X
Xiaoyu Wang 已提交
454 455 456 457 458 459 460 461 462 463 464 465 466
  }

  return code;
}

static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
  SLogicNode* pPartAgg = NULL;
  int32_t     code = stbSplCreatePartAggNode((SAggLogicNode*)pInfo->pSplitNode, &pPartAgg);
  if (TSDB_CODE_SUCCESS == code) {
    code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartAgg);
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
467
                                     (SNode*)splCreateScanSubplan(pCxt, pPartAgg, SPLIT_FLAG_STABLE_SPLIT));
X
Xiaoyu Wang 已提交
468 469 470 471 472
  }
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
  return code;
}

X
Xiaoyu Wang 已提交
473
static SNode* stbSplCreateColumnNode(SExprNode* pExpr) {
474
  SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
X
Xiaoyu Wang 已提交
475 476 477 478 479 480 481 482 483 484 485 486 487
  if (NULL == pCol) {
    return NULL;
  }
  if (QUERY_NODE_COLUMN == nodeType(pExpr)) {
    strcpy(pCol->tableAlias, ((SColumnNode*)pExpr)->tableAlias);
  }
  strcpy(pCol->colName, pExpr->aliasName);
  strcpy(pCol->node.aliasName, pExpr->aliasName);
  pCol->node.resType = pExpr->resType;
  return (SNode*)pCol;
}

static SNode* stbSplCreateOrderByExpr(SOrderByExprNode* pSortKey, SNode* pCol) {
488
  SOrderByExprNode* pOutput = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
X
Xiaoyu Wang 已提交
489 490 491 492 493
  if (NULL == pOutput) {
    return NULL;
  }
  pOutput->pExpr = nodesCloneNode(pCol);
  if (NULL == pOutput->pExpr) {
494
    nodesDestroyNode((SNode*)pOutput);
X
Xiaoyu Wang 已提交
495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545
    return NULL;
  }
  pOutput->order = pSortKey->order;
  pOutput->nullOrder = pSortKey->nullOrder;
  return (SNode*)pOutput;
}

static int32_t stbSplCreateMergeKeys(SNodeList* pSortKeys, SNodeList* pTargets, SNodeList** pOutput) {
  int32_t    code = TSDB_CODE_SUCCESS;
  SNodeList* pMergeKeys = NULL;
  SNode*     pNode = NULL;
  FOREACH(pNode, pSortKeys) {
    SOrderByExprNode* pSortKey = (SOrderByExprNode*)pNode;
    SNode*            pTarget = NULL;
    bool              found = false;
    FOREACH(pTarget, pTargets) {
      if (0 == strcmp(((SExprNode*)pSortKey->pExpr)->aliasName, ((SColumnNode*)pTarget)->colName)) {
        code = nodesListMakeStrictAppend(&pMergeKeys, stbSplCreateOrderByExpr(pSortKey, pTarget));
        if (TSDB_CODE_SUCCESS != code) {
          break;
        }
        found = true;
      }
    }
    if (TSDB_CODE_SUCCESS == code && !found) {
      SNode* pCol = stbSplCreateColumnNode((SExprNode*)pSortKey->pExpr);
      code = nodesListMakeStrictAppend(&pMergeKeys, stbSplCreateOrderByExpr(pSortKey, pCol));
      if (TSDB_CODE_SUCCESS == code) {
        code = nodesListStrictAppend(pTargets, pCol);
      } else {
        nodesDestroyNode(pCol);
      }
    }
    if (TSDB_CODE_SUCCESS != code) {
      break;
    }
  }
  if (TSDB_CODE_SUCCESS == code) {
    *pOutput = pMergeKeys;
  } else {
    nodesDestroyList(pMergeKeys);
  }
  return code;
}

static int32_t stbSplCreatePartSortNode(SSortLogicNode* pSort, SLogicNode** pOutputPartSort,
                                        SNodeList** pOutputMergeKeys) {
  SNodeList* pSortKeys = pSort->pSortKeys;
  pSort->pSortKeys = NULL;
  SNodeList* pChildren = pSort->node.pChildren;
  pSort->node.pChildren = NULL;
X
Xiaoyu Wang 已提交
546 547

  int32_t         code = TSDB_CODE_SUCCESS;
548
  SSortLogicNode* pPartSort = (SSortLogicNode*)nodesCloneNode((SNode*)pSort);
X
Xiaoyu Wang 已提交
549 550 551 552
  if (NULL == pPartSort) {
    code = TSDB_CODE_OUT_OF_MEMORY;
  }

X
Xiaoyu Wang 已提交
553
  SNodeList* pMergeKeys = NULL;
X
Xiaoyu Wang 已提交
554
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
555
    pPartSort->node.pChildren = pChildren;
X
Xiaoyu Wang 已提交
556
    pPartSort->pSortKeys = pSortKeys;
X
Xiaoyu Wang 已提交
557
    code = stbSplCreateMergeKeys(pPartSort->pSortKeys, pPartSort->node.pTargets, &pMergeKeys);
X
Xiaoyu Wang 已提交
558 559 560
  }

  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
561 562
    *pOutputPartSort = (SLogicNode*)pPartSort;
    *pOutputMergeKeys = pMergeKeys;
X
Xiaoyu Wang 已提交
563
  } else {
564
    nodesDestroyNode((SNode*)pPartSort);
X
Xiaoyu Wang 已提交
565
    nodesDestroyList(pMergeKeys);
X
Xiaoyu Wang 已提交
566 567 568 569 570 571 572
  }

  return code;
}

static int32_t stbSplSplitSortNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
  SLogicNode* pPartSort = NULL;
X
Xiaoyu Wang 已提交
573 574
  SNodeList*  pMergeKeys = NULL;
  int32_t     code = stbSplCreatePartSortNode((SSortLogicNode*)pInfo->pSplitNode, &pPartSort, &pMergeKeys);
X
Xiaoyu Wang 已提交
575
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
576
    code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, pMergeKeys, pPartSort);
X
Xiaoyu Wang 已提交
577 578 579
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
580
                                     (SNode*)splCreateScanSubplan(pCxt, pPartSort, SPLIT_FLAG_STABLE_SPLIT));
X
Xiaoyu Wang 已提交
581 582 583 584 585
  }
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
  return code;
}

X
Xiaoyu Wang 已提交
586
static int32_t stbSplSplitScanNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
587
  int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pInfo->pSplitNode, SUBPLAN_TYPE_MERGE);
X
Xiaoyu Wang 已提交
588 589
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
590
                                     (SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
X
Xiaoyu Wang 已提交
591 592 593 594
  }
  return code;
}

X
Xiaoyu Wang 已提交
595
static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
596 597 598 599
  if (pCxt->pPlanCxt->rSmaQuery) {
    return TSDB_CODE_SUCCESS;
  }

X
Xiaoyu Wang 已提交
600 601
  SStableSplitInfo info = {0};
  if (!splMatch(pCxt, pSubplan, SPLIT_FLAG_STABLE_SPLIT, (FSplFindSplitNode)stbSplFindSplitNode, &info)) {
602 603
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
604 605 606

  int32_t code = TSDB_CODE_SUCCESS;
  switch (nodeType(info.pSplitNode)) {
X
Xiaoyu Wang 已提交
607 608 609
    case QUERY_NODE_LOGIC_PLAN_AGG:
      code = stbSplSplitAggNode(pCxt, &info);
      break;
X
Xiaoyu Wang 已提交
610 611 612
    case QUERY_NODE_LOGIC_PLAN_WINDOW:
      code = stbSplSplitWindowNode(pCxt, &info);
      break;
X
Xiaoyu Wang 已提交
613 614 615
    case QUERY_NODE_LOGIC_PLAN_SORT:
      code = stbSplSplitSortNode(pCxt, &info);
      break;
X
Xiaoyu Wang 已提交
616 617 618 619 620
    case QUERY_NODE_LOGIC_PLAN_SCAN:
      code = stbSplSplitScanNode(pCxt, &info);
      break;
    default:
      break;
621
  }
X
Xiaoyu Wang 已提交
622

623 624 625 626 627
  ++(pCxt->groupId);
  pCxt->split = true;
  return code;
}

X
Xiaoyu Wang 已提交
628 629 630 631 632 633 634
typedef struct SSigTbJoinSplitInfo {
  SJoinLogicNode* pJoin;
  SLogicNode*     pSplitNode;
  SLogicSubplan*  pSubplan;
} SSigTbJoinSplitInfo;

static bool sigTbJoinSplNeedSplit(SJoinLogicNode* pJoin) {
X
Xiaoyu Wang 已提交
635 636 637 638 639
  if (!pJoin->isSingleTableJoin) {
    return false;
  }
  return QUERY_NODE_LOGIC_PLAN_EXCHANGE != nodeType(nodesListGetNode(pJoin->node.pChildren, 0)) &&
         QUERY_NODE_LOGIC_PLAN_EXCHANGE != nodeType(nodesListGetNode(pJoin->node.pChildren, 1));
640 641
}

X
Xiaoyu Wang 已提交
642 643
static SJoinLogicNode* sigTbJoinSplMatchByNode(SLogicNode* pNode) {
  if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pNode) && sigTbJoinSplNeedSplit((SJoinLogicNode*)pNode)) {
X
Xiaoyu Wang 已提交
644
    return (SJoinLogicNode*)pNode;
645 646 647
  }
  SNode* pChild;
  FOREACH(pChild, pNode->pChildren) {
X
Xiaoyu Wang 已提交
648
    SJoinLogicNode* pSplitNode = sigTbJoinSplMatchByNode((SLogicNode*)pChild);
649 650 651 652 653 654 655
    if (NULL != pSplitNode) {
      return pSplitNode;
    }
  }
  return NULL;
}

656
static bool sigTbJoinSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SSigTbJoinSplitInfo* pInfo) {
X
Xiaoyu Wang 已提交
657
  SJoinLogicNode* pJoin = sigTbJoinSplMatchByNode(pSubplan->pNode);
X
Xiaoyu Wang 已提交
658 659
  if (NULL != pJoin) {
    pInfo->pJoin = pJoin;
660
    pInfo->pSplitNode = (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1);
661 662
    pInfo->pSubplan = pSubplan;
  }
X
Xiaoyu Wang 已提交
663
  return NULL != pJoin;
664 665
}

X
Xiaoyu Wang 已提交
666 667 668
static int32_t singleTableJoinSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
  SSigTbJoinSplitInfo info = {0};
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)sigTbJoinSplFindSplitNode, &info)) {
X
Xiaoyu Wang 已提交
669 670
    return TSDB_CODE_SUCCESS;
  }
671
  int32_t code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pSplitNode, info.pSubplan->subplanType);
X
Xiaoyu Wang 已提交
672
  if (TSDB_CODE_SUCCESS == code) {
673
    code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, info.pSplitNode, 0));
X
Xiaoyu Wang 已提交
674 675 676 677 678 679
  }
  ++(pCxt->groupId);
  pCxt->split = true;
  return code;
}

680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713
static bool unionIsChildSubplan(SLogicNode* pLogicNode, int32_t groupId) {
  if (QUERY_NODE_LOGIC_PLAN_EXCHANGE == nodeType(pLogicNode)) {
    return ((SExchangeLogicNode*)pLogicNode)->srcGroupId == groupId;
  }

  SNode* pChild;
  FOREACH(pChild, pLogicNode->pChildren) {
    bool isChild = unionIsChildSubplan((SLogicNode*)pChild, groupId);
    if (isChild) {
      return isChild;
    }
  }
  return false;
}

static int32_t unionMountSubplan(SLogicSubplan* pParent, SNodeList* pChildren) {
  SNode* pChild = NULL;
  WHERE_EACH(pChild, pChildren) {
    if (unionIsChildSubplan(pParent->pNode, ((SLogicSubplan*)pChild)->id.groupId)) {
      int32_t code = nodesListMakeAppend(&pParent->pChildren, pChild);
      if (TSDB_CODE_SUCCESS == code) {
        REPLACE_NODE(NULL);
        ERASE_NODE(pChildren);
        continue;
      } else {
        return code;
      }
    }
    WHERE_NEXT;
  }
  return TSDB_CODE_SUCCESS;
}

static SLogicSubplan* unionCreateSubplan(SSplitContext* pCxt, SLogicNode* pNode) {
714
  SLogicSubplan* pSubplan = (SLogicSubplan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
715 716 717
  if (NULL == pSubplan) {
    return NULL;
  }
718
  pSubplan->id.queryId = pCxt->queryId;
719 720 721
  pSubplan->id.groupId = pCxt->groupId;
  pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
  pSubplan->pNode = pNode;
722
  pNode->pParent = NULL;
723 724 725 726 727 728 729 730 731 732 733 734
  return pSubplan;
}

static int32_t unionSplitSubplan(SSplitContext* pCxt, SLogicSubplan* pUnionSubplan, SLogicNode* pSplitNode) {
  SNodeList* pSubplanChildren = pUnionSubplan->pChildren;
  pUnionSubplan->pChildren = NULL;

  int32_t code = TSDB_CODE_SUCCESS;

  SNode* pChild = NULL;
  FOREACH(pChild, pSplitNode->pChildren) {
    SLogicSubplan* pNewSubplan = unionCreateSubplan(pCxt, (SLogicNode*)pChild);
735
    code = nodesListMakeStrictAppend(&pUnionSubplan->pChildren, (SNode*)pNewSubplan);
736 737 738 739 740 741 742 743 744 745 746 747 748 749 750
    if (TSDB_CODE_SUCCESS == code) {
      REPLACE_NODE(NULL);
      code = unionMountSubplan(pNewSubplan, pSubplanChildren);
    }
    if (TSDB_CODE_SUCCESS != code) {
      break;
    }
  }
  if (TSDB_CODE_SUCCESS == code) {
    nodesDestroyList(pSubplanChildren);
    DESTORY_LIST(pSplitNode->pChildren);
  }
  return code;
}

X
Xiaoyu Wang 已提交
751 752 753 754 755
typedef struct SUnionAllSplitInfo {
  SProjectLogicNode* pProject;
  SLogicSubplan*     pSubplan;
} SUnionAllSplitInfo;

X
Xiaoyu Wang 已提交
756
static SLogicNode* unAllSplMatchByNode(SLogicNode* pNode) {
X
Xiaoyu Wang 已提交
757 758 759 760 761
  if (QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
    return pNode;
  }
  SNode* pChild;
  FOREACH(pChild, pNode->pChildren) {
X
Xiaoyu Wang 已提交
762
    SLogicNode* pSplitNode = unAllSplMatchByNode((SLogicNode*)pChild);
X
Xiaoyu Wang 已提交
763 764 765 766 767 768 769
    if (NULL != pSplitNode) {
      return pSplitNode;
    }
  }
  return NULL;
}

770
static bool unAllSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SUnionAllSplitInfo* pInfo) {
X
Xiaoyu Wang 已提交
771
  SLogicNode* pSplitNode = unAllSplMatchByNode(pSubplan->pNode);
X
Xiaoyu Wang 已提交
772 773 774 775 776 777 778
  if (NULL != pSplitNode) {
    pInfo->pProject = (SProjectLogicNode*)pSplitNode;
    pInfo->pSubplan = pSubplan;
  }
  return NULL != pSplitNode;
}

X
Xiaoyu Wang 已提交
779
static int32_t unAllSplCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SProjectLogicNode* pProject) {
780
  SExchangeLogicNode* pExchange = (SExchangeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE);
X
Xiaoyu Wang 已提交
781 782 783 784
  if (NULL == pExchange) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  pExchange->srcGroupId = pCxt->groupId;
X
Xiaoyu Wang 已提交
785
  pExchange->node.precision = pProject->node.precision;
X
Xiaoyu Wang 已提交
786 787 788 789 790 791 792
  pExchange->node.pTargets = nodesCloneList(pProject->node.pTargets);
  if (NULL == pExchange->node.pTargets) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  pSubplan->subplanType = SUBPLAN_TYPE_MERGE;

X
Xiaoyu Wang 已提交
793 794
  if (NULL == pProject->node.pParent) {
    pSubplan->pNode = (SLogicNode*)pExchange;
795
    nodesDestroyNode((SNode*)pProject);
X
Xiaoyu Wang 已提交
796 797 798 799 800
    return TSDB_CODE_SUCCESS;
  }

  SNode* pNode;
  FOREACH(pNode, pProject->node.pParent->pChildren) {
801
    if (nodesEqualNode(pNode, (SNode*)pProject)) {
X
Xiaoyu Wang 已提交
802 803 804 805 806
      REPLACE_NODE(pExchange);
      nodesDestroyNode(pNode);
      return TSDB_CODE_SUCCESS;
    }
  }
807 808
  nodesDestroyNode((SNode*)pExchange);
  return TSDB_CODE_PLAN_INTERNAL_ERROR;
X
Xiaoyu Wang 已提交
809 810
}

X
Xiaoyu Wang 已提交
811 812
static int32_t unionAllSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
  SUnionAllSplitInfo info = {0};
X
Xiaoyu Wang 已提交
813
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)unAllSplFindSplitNode, &info)) {
814 815
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
816

817
  int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pProject);
X
Xiaoyu Wang 已提交
818
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
819
    code = unAllSplCreateExchangeNode(pCxt, info.pSubplan, info.pProject);
X
Xiaoyu Wang 已提交
820 821
  }
  ++(pCxt->groupId);
822
  pCxt->split = true;
X
Xiaoyu Wang 已提交
823 824 825
  return code;
}

X
Xiaoyu Wang 已提交
826 827 828 829 830
typedef struct SUnionDistinctSplitInfo {
  SAggLogicNode* pAgg;
  SLogicSubplan* pSubplan;
} SUnionDistinctSplitInfo;

X
Xiaoyu Wang 已提交
831
static SLogicNode* unDistSplMatchByNode(SLogicNode* pNode) {
X
Xiaoyu Wang 已提交
832 833 834 835 836
  if (QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
    return pNode;
  }
  SNode* pChild;
  FOREACH(pChild, pNode->pChildren) {
X
Xiaoyu Wang 已提交
837
    SLogicNode* pSplitNode = unDistSplMatchByNode((SLogicNode*)pChild);
X
Xiaoyu Wang 已提交
838 839 840 841 842 843 844
    if (NULL != pSplitNode) {
      return pSplitNode;
    }
  }
  return NULL;
}

X
Xiaoyu Wang 已提交
845
static int32_t unDistSplCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SAggLogicNode* pAgg) {
846
  SExchangeLogicNode* pExchange = (SExchangeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE);
X
Xiaoyu Wang 已提交
847 848 849 850
  if (NULL == pExchange) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  pExchange->srcGroupId = pCxt->groupId;
X
Xiaoyu Wang 已提交
851
  pExchange->node.precision = pAgg->node.precision;
X
Xiaoyu Wang 已提交
852
  pExchange->node.pTargets = nodesCloneList(pAgg->pGroupKeys);
X
Xiaoyu Wang 已提交
853 854 855 856 857 858
  if (NULL == pExchange->node.pTargets) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  pSubplan->subplanType = SUBPLAN_TYPE_MERGE;

859
  return nodesListMakeAppend(&pAgg->node.pChildren, (SNode*)pExchange);
X
Xiaoyu Wang 已提交
860 861
}

862
static bool unDistSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SUnionDistinctSplitInfo* pInfo) {
X
Xiaoyu Wang 已提交
863
  SLogicNode* pSplitNode = unDistSplMatchByNode(pSubplan->pNode);
X
Xiaoyu Wang 已提交
864 865 866 867 868 869 870
  if (NULL != pSplitNode) {
    pInfo->pAgg = (SAggLogicNode*)pSplitNode;
    pInfo->pSubplan = pSubplan;
  }
  return NULL != pSplitNode;
}

X
Xiaoyu Wang 已提交
871 872
static int32_t unionDistinctSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
  SUnionDistinctSplitInfo info = {0};
X
Xiaoyu Wang 已提交
873
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)unDistSplFindSplitNode, &info)) {
X
Xiaoyu Wang 已提交
874 875 876
    return TSDB_CODE_SUCCESS;
  }

877
  int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pAgg);
X
Xiaoyu Wang 已提交
878
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
879
    code = unDistSplCreateExchangeNode(pCxt, info.pSubplan, info.pAgg);
X
Xiaoyu Wang 已提交
880 881 882 883 884 885
  }
  ++(pCxt->groupId);
  pCxt->split = true;
  return code;
}

X
Xiaoyu Wang 已提交
886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928
typedef struct SSmaIndexSplitInfo {
  SMergeLogicNode* pMerge;
  SLogicSubplan*   pSubplan;
} SSmaIndexSplitInfo;

static SLogicNode* smaIdxSplMatchByNode(SLogicNode* pNode) {
  if (QUERY_NODE_LOGIC_PLAN_MERGE == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
    return pNode;
  }
  SNode* pChild;
  FOREACH(pChild, pNode->pChildren) {
    SLogicNode* pSplitNode = smaIdxSplMatchByNode((SLogicNode*)pChild);
    if (NULL != pSplitNode) {
      return pSplitNode;
    }
  }
  return NULL;
}

static bool smaIdxSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SSmaIndexSplitInfo* pInfo) {
  SLogicNode* pSplitNode = smaIdxSplMatchByNode(pSubplan->pNode);
  if (NULL != pSplitNode) {
    pInfo->pMerge = (SMergeLogicNode*)pSplitNode;
    pInfo->pSubplan = pSubplan;
  }
  return NULL != pSplitNode;
}

static int32_t smaIndexSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
  SSmaIndexSplitInfo info = {0};
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)smaIdxSplFindSplitNode, &info)) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pMerge);
  if (TSDB_CODE_SUCCESS == code) {
    info.pMerge->srcGroupId = pCxt->groupId;
  }
  ++(pCxt->groupId);
  pCxt->split = true;
  return code;
}

X
Xiaoyu Wang 已提交
929 930 931 932 933
// clang-format off
static const SSplitRule splitRuleSet[] = {
  {.pName = "SuperTableSplit",      .splitFunc = stableSplit},
  {.pName = "SingleTableJoinSplit", .splitFunc = singleTableJoinSplit},
  {.pName = "UnionAllSplit",        .splitFunc = unionAllSplit},
X
Xiaoyu Wang 已提交
934 935
  {.pName = "UnionDistinctSplit",   .splitFunc = unionDistinctSplit},
  {.pName = "SmaIndexSplit",        .splitFunc = smaIndexSplit}
X
Xiaoyu Wang 已提交
936 937
};
// clang-format on
X
Xiaoyu Wang 已提交
938 939 940

static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule));

941 942
static void dumpLogicSubplan(const char* pRuleName, SLogicSubplan* pSubplan) {
  char* pStr = NULL;
943
  nodesNodeToString((SNode*)pSubplan, false, &pStr, NULL);
944 945 946 947
  qDebugL("apply %s rule: %s", pRuleName, pStr);
  taosMemoryFree(pStr);
}

948 949 950 951
static int32_t applySplitRule(SPlanContext* pCxt, SLogicSubplan* pSubplan) {
  SSplitContext cxt = {
      .pPlanCxt = pCxt, .queryId = pSubplan->id.queryId, .groupId = pSubplan->id.groupId + 1, .split = false};
  bool split = false;
X
Xiaoyu Wang 已提交
952
  do {
953
    split = false;
X
Xiaoyu Wang 已提交
954
    for (int32_t i = 0; i < splitRuleNum; ++i) {
955
      cxt.split = false;
956
      int32_t code = splitRuleSet[i].splitFunc(&cxt, pSubplan);
X
Xiaoyu Wang 已提交
957 958 959
      if (TSDB_CODE_SUCCESS != code) {
        return code;
      }
960 961 962 963
      if (cxt.split) {
        split = true;
        dumpLogicSubplan(splitRuleSet[i].pName, pSubplan);
      }
X
Xiaoyu Wang 已提交
964
    }
965
  } while (split);
X
Xiaoyu Wang 已提交
966 967
  return TSDB_CODE_SUCCESS;
}
X
Xiaoyu Wang 已提交
968

X
Xiaoyu Wang 已提交
969 970 971 972 973 974 975 976 977 978
static void setVgroupsInfo(SLogicNode* pNode, SLogicSubplan* pSubplan) {
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
    TSWAP(((SScanLogicNode*)pNode)->pVgroupList, pSubplan->pVgroupList);
    return;
  }

  SNode* pChild;
  FOREACH(pChild, pNode->pChildren) { setVgroupsInfo((SLogicNode*)pChild, pSubplan); }
}

X
Xiaoyu Wang 已提交
979 980 981 982
int32_t splitLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) {
  if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY == nodeType(pLogicSubplan->pNode)) {
    setVgroupsInfo(pLogicSubplan->pNode, pLogicSubplan);
    return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
983
  }
X
Xiaoyu Wang 已提交
984 985
  return applySplitRule(pCxt, pLogicSubplan);
}