planSpliter.c 44.4 KB
Newer Older
X
Xiaoyu Wang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

X
Xiaoyu Wang 已提交
16
#include "functionMgt.h"
X
Xiaoyu Wang 已提交
17
#include "planInt.h"
X
Xiaoyu Wang 已提交
18
#include "tglobal.h"
X
Xiaoyu Wang 已提交
19

X
Xiaoyu Wang 已提交
20
#define SPLIT_FLAG_MASK(n) (1 << n)
X
Xiaoyu Wang 已提交
21

X
Xiaoyu Wang 已提交
22
#define SPLIT_FLAG_STABLE_SPLIT SPLIT_FLAG_MASK(0)
X
Xiaoyu Wang 已提交
23

X
Xiaoyu Wang 已提交
24
#define SPLIT_FLAG_SET_MASK(val, mask)  (val) |= (mask)
X
Xiaoyu Wang 已提交
25 26 27
#define SPLIT_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0)

typedef struct SSplitContext {
28 29 30 31
  SPlanContext* pPlanCxt;
  uint64_t      queryId;
  int32_t       groupId;
  bool          split;
X
Xiaoyu Wang 已提交
32 33
} SSplitContext;

34
typedef int32_t (*FSplit)(SSplitContext* pCxt, SLogicSubplan* pSubplan);
X
Xiaoyu Wang 已提交
35 36

typedef struct SSplitRule {
X
Xiaoyu Wang 已提交
37
  char*  pName;
X
Xiaoyu Wang 已提交
38 39 40
  FSplit splitFunc;
} SSplitRule;

X
Xiaoyu Wang 已提交
41 42
// typedef bool (*FSplFindSplitNode)(SSplitContext* pCxt, SLogicSubplan* pSubplan, void* pInfo);
typedef bool (*FSplFindSplitNode)(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode, void* pInfo);
X
Xiaoyu Wang 已提交
43

X
Xiaoyu Wang 已提交
44 45 46 47 48 49 50 51 52 53 54
static void splSetSubplanVgroups(SLogicSubplan* pSubplan, SLogicNode* pNode) {
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
    TSWAP(pSubplan->pVgroupList, ((SScanLogicNode*)pNode)->pVgroupList);
  } else {
    if (1 == LIST_LENGTH(pNode->pChildren)) {
      splSetSubplanVgroups(pSubplan, (SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
    }
  }
}

static SLogicSubplan* splCreateScanSubplan(SSplitContext* pCxt, SLogicNode* pNode, int32_t flag) {
55
  SLogicSubplan* pSubplan = (SLogicSubplan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
X
Xiaoyu Wang 已提交
56 57 58
  if (NULL == pSubplan) {
    return NULL;
  }
59
  pSubplan->id.queryId = pCxt->queryId;
X
Xiaoyu Wang 已提交
60 61
  pSubplan->id.groupId = pCxt->groupId;
  pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
X
Xiaoyu Wang 已提交
62 63 64
  pSubplan->pNode = pNode;
  pSubplan->pNode->pParent = NULL;
  splSetSubplanVgroups(pSubplan, pNode);
65
  SPLIT_FLAG_SET_MASK(pSubplan->splitFlag, flag);
X
Xiaoyu Wang 已提交
66 67 68
  return pSubplan;
}

69
static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pChild, SExchangeLogicNode** pOutput) {
70
  SExchangeLogicNode* pExchange = (SExchangeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE);
X
Xiaoyu Wang 已提交
71 72 73 74
  if (NULL == pExchange) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  pExchange->srcGroupId = pCxt->groupId;
75 76
  pExchange->node.precision = pChild->precision;
  pExchange->node.pTargets = nodesCloneList(pChild->pTargets);
X
Xiaoyu Wang 已提交
77 78 79 80
  if (NULL == pExchange->node.pTargets) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

81 82 83 84
  *pOutput = pExchange;
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
85 86 87 88 89
static int32_t splCreateExchangeNodeForSubplan(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode,
                                               ESubplanType subplanType) {
  SExchangeLogicNode* pExchange = NULL;
  int32_t             code = splCreateExchangeNode(pCxt, pSplitNode, &pExchange);
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
90
    code = replaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pExchange);
X
Xiaoyu Wang 已提交
91 92 93 94
  }
  if (TSDB_CODE_SUCCESS == code) {
    pSubplan->subplanType = subplanType;
  } else {
95
    nodesDestroyNode((SNode*)pExchange);
X
Xiaoyu Wang 已提交
96 97
  }
  return code;
X
Xiaoyu Wang 已提交
98 99
}

X
Xiaoyu Wang 已提交
100 101 102 103 104 105 106 107 108 109 110 111 112 113
static bool splMatchByNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode, FSplFindSplitNode func,
                           void* pInfo) {
  if (func(pCxt, pSubplan, pNode, pInfo)) {
    return true;
  }
  SNode* pChild;
  FOREACH(pChild, pNode->pChildren) {
    if (splMatchByNode(pCxt, pSubplan, (SLogicNode*)pChild, func, pInfo)) {
      return true;
    }
  }
  return NULL;
}

114 115
static bool splMatch(SSplitContext* pCxt, SLogicSubplan* pSubplan, int32_t flag, FSplFindSplitNode func, void* pInfo) {
  if (!SPLIT_FLAG_TEST_MASK(pSubplan->splitFlag, flag)) {
X
Xiaoyu Wang 已提交
116
    if (splMatchByNode(pCxt, pSubplan, pSubplan->pNode, func, pInfo)) {
117 118 119 120 121 122 123 124 125 126 127 128
      return true;
    }
  }
  SNode* pChild;
  FOREACH(pChild, pSubplan->pChildren) {
    if (splMatch(pCxt, (SLogicSubplan*)pChild, flag, func, pInfo)) {
      return true;
    }
  }
  return false;
}

X
Xiaoyu Wang 已提交
129 130 131 132 133
static void splSetParent(SLogicNode* pNode) {
  SNode* pChild = NULL;
  FOREACH(pChild, pNode->pChildren) { ((SLogicNode*)pChild)->pParent = pNode; }
}

X
Xiaoyu Wang 已提交
134
typedef struct SStableSplitInfo {
X
Xiaoyu Wang 已提交
135 136
  SLogicNode*    pSplitNode;
  SLogicSubplan* pSubplan;
X
Xiaoyu Wang 已提交
137 138
} SStableSplitInfo;

X
Xiaoyu Wang 已提交
139 140 141
static bool stbSplHasGatherExecFunc(const SNodeList* pFuncs) {
  SNode* pFunc = NULL;
  FOREACH(pFunc, pFuncs) {
142 143
    if (!fmIsWindowPseudoColumnFunc(((SFunctionNode*)pFunc)->funcId) &&
        !fmIsDistExecFunc(((SFunctionNode*)pFunc)->funcId)) {
X
Xiaoyu Wang 已提交
144 145 146 147 148 149
      return true;
    }
  }
  return false;
}

150 151
static bool stbSplIsMultiTbScan(bool streamQuery, SScanLogicNode* pScan) {
  return (NULL != pScan->pVgroupList && pScan->pVgroupList->numOfVgroups > 1) ||
X
Xiaoyu Wang 已提交
152
         (streamQuery && TSDB_SUPER_TABLE == pScan->tableType);
X
Xiaoyu Wang 已提交
153 154
}

155
static bool stbSplHasMultiTbScan(bool streamQuery, SLogicNode* pNode) {
X
Xiaoyu Wang 已提交
156 157 158 159
  if (1 != LIST_LENGTH(pNode->pChildren)) {
    return false;
  }
  SNode* pChild = nodesListGetNode(pNode->pChildren, 0);
X
Xiaoyu Wang 已提交
160 161 162 163 164 165
  if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pChild)) {
    if (1 != LIST_LENGTH(((SLogicNode*)pChild)->pChildren)) {
      return false;
    }
    pChild = nodesListGetNode(((SLogicNode*)pChild)->pChildren, 0);
  }
166
  return (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pChild));
X
Xiaoyu Wang 已提交
167 168
}

169 170 171 172 173 174 175 176 177 178 179 180 181
static bool stbSplNeedSplitWindow(bool streamQuery, SLogicNode* pNode) {
  SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode;
  if (WINDOW_TYPE_INTERVAL == pWindow->winType) {
    return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
  }

  if (WINDOW_TYPE_SESSION == pWindow->winType) {
    if (!streamQuery) {
      return stbSplHasMultiTbScan(streamQuery, pNode);
    } else {
      return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
    }
  }
X
Xiaoyu Wang 已提交
182

183 184 185 186 187 188 189 190 191 192 193
  if (WINDOW_TYPE_STATE == pWindow->winType) {
    if (!streamQuery) {
      return stbSplHasMultiTbScan(streamQuery, pNode);
    } else {
      return false;
    }
  }

  return false;
}

194
static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
X
Xiaoyu Wang 已提交
195
  switch (nodeType(pNode)) {
X
Xiaoyu Wang 已提交
196 197
    case QUERY_NODE_LOGIC_PLAN_SCAN:
      return stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode);
X
Xiaoyu Wang 已提交
198 199
    case QUERY_NODE_LOGIC_PLAN_JOIN:
      return !(((SJoinLogicNode*)pNode)->isSingleTableJoin);
200 201
    case QUERY_NODE_LOGIC_PLAN_PARTITION:
      return stbSplHasMultiTbScan(streamQuery, pNode);
X
Xiaoyu Wang 已提交
202 203
    case QUERY_NODE_LOGIC_PLAN_AGG:
      return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
204 205
    case QUERY_NODE_LOGIC_PLAN_WINDOW:
      return stbSplNeedSplitWindow(streamQuery, pNode);
206 207
    case QUERY_NODE_LOGIC_PLAN_SORT:
      return stbSplHasMultiTbScan(streamQuery, pNode);
X
Xiaoyu Wang 已提交
208 209 210 211 212 213
    default:
      break;
  }
  return false;
}

X
Xiaoyu Wang 已提交
214 215 216 217
static bool stbSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
                                SStableSplitInfo* pInfo) {
  if (stbSplNeedSplit(pCxt->pPlanCxt->streamQuery, pNode)) {
    pInfo->pSplitNode = pNode;
218
    pInfo->pSubplan = pSubplan;
X
Xiaoyu Wang 已提交
219
    return true;
220
  }
X
Xiaoyu Wang 已提交
221
  return false;
222 223
}

X
Xiaoyu Wang 已提交
224 225 226 227 228 229 230 231
static int32_t stbSplRewriteFuns(const SNodeList* pFuncs, SNodeList** pPartialFuncs, SNodeList** pMergeFuncs) {
  SNode* pNode = NULL;
  FOREACH(pNode, pFuncs) {
    SFunctionNode* pFunc = (SFunctionNode*)pNode;
    SFunctionNode* pPartFunc = NULL;
    SFunctionNode* pMergeFunc = NULL;
    int32_t        code = TSDB_CODE_SUCCESS;
    if (fmIsWindowPseudoColumnFunc(pFunc->funcId)) {
232 233
      pPartFunc = (SFunctionNode*)nodesCloneNode(pNode);
      pMergeFunc = (SFunctionNode*)nodesCloneNode(pNode);
X
Xiaoyu Wang 已提交
234
      if (NULL == pPartFunc || NULL == pMergeFunc) {
235 236
        nodesDestroyNode((SNode*)pPartFunc);
        nodesDestroyNode((SNode*)pMergeFunc);
X
Xiaoyu Wang 已提交
237 238 239 240 241 242
        code = TSDB_CODE_OUT_OF_MEMORY;
      }
    } else {
      code = fmGetDistMethod(pFunc, &pPartFunc, &pMergeFunc);
    }
    if (TSDB_CODE_SUCCESS == code) {
243
      code = nodesListMakeStrictAppend(pPartialFuncs, (SNode*)pPartFunc);
X
Xiaoyu Wang 已提交
244 245
    }
    if (TSDB_CODE_SUCCESS == code) {
246
      code = nodesListMakeStrictAppend(pMergeFuncs, (SNode*)pMergeFunc);
X
Xiaoyu Wang 已提交
247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267
    }
    if (TSDB_CODE_SUCCESS != code) {
      nodesDestroyList(*pPartialFuncs);
      nodesDestroyList(*pMergeFuncs);
      return code;
    }
  }
  return TSDB_CODE_SUCCESS;
}

static int32_t stbSplAppendWStart(SNodeList* pFuncs, int32_t* pIndex) {
  int32_t index = 0;
  SNode*  pFunc = NULL;
  FOREACH(pFunc, pFuncs) {
    if (FUNCTION_TYPE_WSTARTTS == ((SFunctionNode*)pFunc)->funcType) {
      *pIndex = index;
      return TSDB_CODE_SUCCESS;
    }
    ++index;
  }

268
  SFunctionNode* pWStart = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
X
Xiaoyu Wang 已提交
269 270 271 272 273 274 275
  if (NULL == pWStart) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  strcpy(pWStart->functionName, "_wstartts");
  snprintf(pWStart->node.aliasName, sizeof(pWStart->node.aliasName), "%s.%p", pWStart->functionName, pWStart);
  int32_t code = fmGetFuncInfo(pWStart, NULL, 0);
  if (TSDB_CODE_SUCCESS == code) {
276
    code = nodesListStrictAppend(pFuncs, (SNode*)pWStart);
X
Xiaoyu Wang 已提交
277 278 279 280 281
  }
  *pIndex = index;
  return code;
}

5
54liuyao 已提交
282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309
static int32_t stbSplAppendWEnd(SWindowLogicNode* pWin, int32_t* pIndex) {
  int32_t index = 0;
  SNode*  pFunc = NULL;
  FOREACH(pFunc, pWin->pFuncs) {
    if (FUNCTION_TYPE_WENDTS == ((SFunctionNode*)pFunc)->funcType) {
      *pIndex = index;
      return TSDB_CODE_SUCCESS;
    }
    ++index;
  }

  SFunctionNode* pWEnd = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
  if (NULL == pWEnd) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  strcpy(pWEnd->functionName, "_wendts");
  snprintf(pWEnd->node.aliasName, sizeof(pWEnd->node.aliasName), "%s.%p", pWEnd->functionName, pWEnd);
  int32_t code = fmGetFuncInfo(pWEnd, NULL, 0);
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListStrictAppend(pWin->pFuncs, (SNode*)pWEnd);
  }
  *pIndex = index;
  if (TSDB_CODE_SUCCESS == code) {
    code = createColumnByRewriteExpr(nodesListGetNode(pWin->pFuncs, index), &pWin->node.pTargets);
  }
  return code;
}

X
Xiaoyu Wang 已提交
310 311 312 313 314 315 316 317 318
static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogicNode** pPartWindow) {
  SNodeList* pFunc = pMergeWindow->pFuncs;
  pMergeWindow->pFuncs = NULL;
  SNodeList* pTargets = pMergeWindow->node.pTargets;
  pMergeWindow->node.pTargets = NULL;
  SNodeList* pChildren = pMergeWindow->node.pChildren;
  pMergeWindow->node.pChildren = NULL;

  int32_t           code = TSDB_CODE_SUCCESS;
319
  SWindowLogicNode* pPartWin = (SWindowLogicNode*)nodesCloneNode((SNode*)pMergeWindow);
X
Xiaoyu Wang 已提交
320 321 322 323 324 325 326
  if (NULL == pPartWin) {
    code = TSDB_CODE_OUT_OF_MEMORY;
  }

  if (TSDB_CODE_SUCCESS == code) {
    pMergeWindow->node.pTargets = pTargets;
    pPartWin->node.pChildren = pChildren;
X
Xiaoyu Wang 已提交
327
    splSetParent((SLogicNode*)pPartWin);
X
Xiaoyu Wang 已提交
328 329 330 331 332 333 334
    code = stbSplRewriteFuns(pFunc, &pPartWin->pFuncs, &pMergeWindow->pFuncs);
  }
  int32_t index = 0;
  if (TSDB_CODE_SUCCESS == code) {
    code = stbSplAppendWStart(pPartWin->pFuncs, &index);
  }
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
335
    code = createColumnByRewriteExprs(pPartWin->pFuncs, &pPartWin->node.pTargets);
X
Xiaoyu Wang 已提交
336 337 338 339 340 341 342 343 344 345 346 347 348
  }
  if (TSDB_CODE_SUCCESS == code) {
    nodesDestroyNode(pMergeWindow->pTspk);
    pMergeWindow->pTspk = nodesCloneNode(nodesListGetNode(pPartWin->node.pTargets, index));
    if (NULL == pMergeWindow->pTspk) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  }

  nodesDestroyList(pFunc);
  if (TSDB_CODE_SUCCESS == code) {
    *pPartWindow = (SLogicNode*)pPartWin;
  } else {
349
    nodesDestroyNode((SNode*)pPartWin);
X
Xiaoyu Wang 已提交
350 351 352 353 354
  }

  return code;
}

X
Xiaoyu Wang 已提交
355 356 357 358 359 360 361 362 363 364 365
static int32_t stbSplGetNumOfVgroups(SLogicNode* pNode) {
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
    return ((SScanLogicNode*)pNode)->pVgroupList->numOfVgroups;
  } else {
    if (1 == LIST_LENGTH(pNode->pChildren)) {
      return stbSplGetNumOfVgroups((SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
    }
  }
  return 0;
}

X
Xiaoyu Wang 已提交
366
static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode,
367
                                     SNodeList* pMergeKeys, SLogicNode* pPartChild, bool groupSort) {
368
  SMergeLogicNode* pMerge = (SMergeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE);
X
Xiaoyu Wang 已提交
369 370 371
  if (NULL == pMerge) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
372
  pMerge->numOfChannels = stbSplGetNumOfVgroups(pPartChild);
X
Xiaoyu Wang 已提交
373 374
  pMerge->srcGroupId = pCxt->groupId;
  pMerge->node.precision = pPartChild->precision;
X
Xiaoyu Wang 已提交
375
  pMerge->pMergeKeys = pMergeKeys;
376
  pMerge->groupSort = groupSort;
X
Xiaoyu Wang 已提交
377 378 379

  int32_t code = TSDB_CODE_SUCCESS;
  pMerge->pInputs = nodesCloneList(pPartChild->pTargets);
X
Xiaoyu Wang 已提交
380
  // NULL != pSubplan means 'merge node' replaces 'split node'.
381 382 383 384 385
  if (NULL == pSubplan) {
    pMerge->node.pTargets = nodesCloneList(pPartChild->pTargets);
  } else {
    pMerge->node.pTargets = nodesCloneList(pSplitNode->pTargets);
  }
X
Xiaoyu Wang 已提交
386 387 388 389 390
  if (NULL == pMerge->node.pTargets || NULL == pMerge->pInputs) {
    code = TSDB_CODE_OUT_OF_MEMORY;
  }
  if (TSDB_CODE_SUCCESS == code) {
    if (NULL == pSubplan) {
391
      code = nodesListMakeAppend(&pSplitNode->pChildren, (SNode*)pMerge);
X
Xiaoyu Wang 已提交
392
    } else {
X
Xiaoyu Wang 已提交
393
      code = replaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pMerge);
X
Xiaoyu Wang 已提交
394 395 396
    }
  }
  if (TSDB_CODE_SUCCESS != code) {
397
    nodesDestroyNode((SNode*)pMerge);
X
Xiaoyu Wang 已提交
398
  }
X
Xiaoyu Wang 已提交
399
  return code;
X
Xiaoyu Wang 已提交
400 401
}

X
Xiaoyu Wang 已提交
402 403 404 405
static int32_t stbSplCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pParent, SLogicNode* pPartChild) {
  SExchangeLogicNode* pExchange = NULL;
  int32_t             code = splCreateExchangeNode(pCxt, pPartChild, &pExchange);
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
406
    pExchange->node.pParent = pParent;
407
    code = nodesListMakeAppend(&pParent->pChildren, (SNode*)pExchange);
X
Xiaoyu Wang 已提交
408 409 410 411
  }
  return code;
}

X
Xiaoyu Wang 已提交
412
static int32_t stbSplCreateMergeKeysByPrimaryKey(SNode* pPrimaryKey, SNodeList** pMergeKeys) {
413
  SOrderByExprNode* pMergeKey = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
414 415 416
  if (NULL == pMergeKey) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
417
  pMergeKey->pExpr = nodesCloneNode(pPrimaryKey);
418
  if (NULL == pMergeKey->pExpr) {
419
    nodesDestroyNode((SNode*)pMergeKey);
420 421 422 423
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  pMergeKey->order = ORDER_ASC;
  pMergeKey->nullOrder = NULL_ORDER_FIRST;
424
  return nodesListMakeStrictAppend(pMergeKeys, (SNode*)pMergeKey);
425 426
}

427
static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
X
Xiaoyu Wang 已提交
428 429 430
  SLogicNode* pPartWindow = NULL;
  int32_t     code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
  if (TSDB_CODE_SUCCESS == code) {
431 432
    ((SWindowLogicNode*)pPartWindow)->windowAlgo = INTERVAL_ALGO_HASH;
    ((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = INTERVAL_ALGO_MERGE;
X
Xiaoyu Wang 已提交
433
    SNodeList* pMergeKeys = NULL;
X
Xiaoyu Wang 已提交
434
    code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk, &pMergeKeys);
X
Xiaoyu Wang 已提交
435
    if (TSDB_CODE_SUCCESS == code) {
436
      code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow, true);
X
Xiaoyu Wang 已提交
437 438 439 440
    }
    if (TSDB_CODE_SUCCESS != code) {
      nodesDestroyList(pMergeKeys);
    }
X
Xiaoyu Wang 已提交
441 442 443
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
444
                                     (SNode*)splCreateScanSubplan(pCxt, pPartWindow, SPLIT_FLAG_STABLE_SPLIT));
X
Xiaoyu Wang 已提交
445 446
  }
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
447
  ++(pCxt->groupId);
X
Xiaoyu Wang 已提交
448 449 450
  return code;
}

451
static int32_t stbSplSplitIntervalForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
452 453 454
  SLogicNode* pPartWindow = NULL;
  int32_t     code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
  if (TSDB_CODE_SUCCESS == code) {
455 456
    ((SWindowLogicNode*)pPartWindow)->windowAlgo = INTERVAL_ALGO_STREAM_SEMI;
    ((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = INTERVAL_ALGO_STREAM_FINAL;
457 458 459 460
    code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartWindow);
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
461
                                     (SNode*)splCreateScanSubplan(pCxt, pPartWindow, SPLIT_FLAG_STABLE_SPLIT));
462 463
  }
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
464
  ++(pCxt->groupId);
465 466 467
  return code;
}

468
static int32_t stbSplSplitInterval(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
469
  if (pCxt->pPlanCxt->streamQuery) {
470
    return stbSplSplitIntervalForStream(pCxt, pInfo);
471
  } else {
472
    return stbSplSplitIntervalForBatch(pCxt, pInfo);
473 474 475
  }
}

476 477 478 479
static int32_t stbSplSplitSessionForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
  SLogicNode* pPartWindow = NULL;
  int32_t     code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
  if (TSDB_CODE_SUCCESS == code) {
5
54liuyao 已提交
480 481 482 483 484 485 486 487 488 489 490 491
    SWindowLogicNode* pPartWin = (SWindowLogicNode*)pPartWindow;
    SWindowLogicNode* pMergeWin = (SWindowLogicNode*)pInfo->pSplitNode;
    pPartWin->windowAlgo = SESSION_ALGO_STREAM_SEMI;
    pMergeWin->windowAlgo = SESSION_ALGO_STREAM_FINAL;
    int32_t index = 0;
    int32_t code = stbSplAppendWEnd(pPartWin, &index);
    if (TSDB_CODE_SUCCESS == code) {
      pMergeWin->pTsEnd = nodesCloneNode(nodesListGetNode(pPartWin->node.pTargets, index));
      if (NULL == pMergeWin->pTsEnd) {
        code = TSDB_CODE_OUT_OF_MEMORY;
      }
    }
492 493 494 495 496 497 498 499 500 501 502
    code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartWindow);
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
                                     (SNode*)splCreateScanSubplan(pCxt, pPartWindow, SPLIT_FLAG_STABLE_SPLIT));
  }
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
  ++(pCxt->groupId);
  return code;
}

503
static void stbSplSetTableMergeScan(SLogicNode* pNode) {
504
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
505 506 507 508 509
    SScanLogicNode* pScan = (SScanLogicNode*)pNode;
    pScan->scanType = SCAN_TYPE_TABLE_MERGE;
    if (NULL != pScan->pGroupTags) {
      pScan->groupSort = true;
    }
510 511
  } else {
    if (1 == LIST_LENGTH(pNode->pChildren)) {
512
      stbSplSetTableMergeScan((SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
513 514 515 516 517 518 519 520 521
    }
  }
}

static int32_t stbSplSplitSessionOrStateForBatch(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
  SLogicNode* pWindow = pInfo->pSplitNode;
  SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pWindow->pChildren, 0);

  SNodeList* pMergeKeys = NULL;
X
Xiaoyu Wang 已提交
522
  int32_t    code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pWindow)->pTspk, &pMergeKeys);
523 524

  if (TSDB_CODE_SUCCESS == code) {
525
    code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pChild, pMergeKeys, (SLogicNode*)pChild, true);
526 527 528 529 530 531 532 533
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
                                     (SNode*)splCreateScanSubplan(pCxt, pChild, SPLIT_FLAG_STABLE_SPLIT));
  }

  if (TSDB_CODE_SUCCESS == code) {
534
    stbSplSetTableMergeScan(pChild);
535 536
    pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
    SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT);
537
    ++(pCxt->groupId);
538 539 540 541 542 543 544
  } else {
    nodesDestroyList(pMergeKeys);
  }

  return code;
}

545
static int32_t stbSplSplitSession(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
546 547 548
  if (pCxt->pPlanCxt->streamQuery) {
    return stbSplSplitSessionForStream(pCxt, pInfo);
  } else {
549 550 551 552 553 554 555 556 557 558 559 560 561
    return stbSplSplitSessionOrStateForBatch(pCxt, pInfo);
  }
}

static int32_t stbSplSplitStateForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
  return TSDB_CODE_PLAN_INTERNAL_ERROR;
}

static int32_t stbSplSplitState(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
  if (pCxt->pPlanCxt->streamQuery) {
    return stbSplSplitStateForStream(pCxt, pInfo);
  } else {
    return stbSplSplitSessionOrStateForBatch(pCxt, pInfo);
562
  }
563 564
}

X
Xiaoyu Wang 已提交
565 566
static SNodeList* stbSplGetPartKeys(SLogicNode* pNode) {
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
567
    return ((SScanLogicNode*)pNode)->pGroupTags;
568 569
  } else if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
    return ((SPartitionLogicNode*)pNode)->pPartitionKeys;
X
Xiaoyu Wang 已提交
570 571 572 573 574 575 576 577 578 579
  } else {
    return NULL;
  }
}

static bool stbSplIsPartTbanme(SNodeList* pPartKeys) {
  if (NULL == pPartKeys || 1 != LIST_LENGTH(pPartKeys)) {
    return false;
  }
  SNode* pPartKey = nodesListGetNode(pPartKeys, 0);
580 581
  return (QUERY_NODE_FUNCTION == nodeType(pPartKey) && FUNCTION_TYPE_TBNAME == ((SFunctionNode*)pPartKey)->funcType) ||
         (QUERY_NODE_COLUMN == nodeType(pPartKey) && COLUMN_TYPE_TBNAME == ((SColumnNode*)pPartKey)->colType);
X
Xiaoyu Wang 已提交
582 583
}

584
static bool stbSplIsPartTableWinodw(SWindowLogicNode* pWindow) {
X
Xiaoyu Wang 已提交
585 586 587
  return stbSplIsPartTbanme(stbSplGetPartKeys((SLogicNode*)nodesListGetNode(pWindow->node.pChildren, 0)));
}

588
static int32_t stbSplSplitWindowForCrossTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
589 590 591 592 593
  switch (((SWindowLogicNode*)pInfo->pSplitNode)->winType) {
    case WINDOW_TYPE_INTERVAL:
      return stbSplSplitInterval(pCxt, pInfo);
    case WINDOW_TYPE_SESSION:
      return stbSplSplitSession(pCxt, pInfo);
594 595
    case WINDOW_TYPE_STATE:
      return stbSplSplitState(pCxt, pInfo);
596 597 598 599 600 601
    default:
      break;
  }
  return TSDB_CODE_PLAN_INTERNAL_ERROR;
}

602
static int32_t stbSplSplitWindowForPartTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
X
Xiaoyu Wang 已提交
603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622
  if (pCxt->pPlanCxt->streamQuery) {
    SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT);
    return TSDB_CODE_SUCCESS;
  }

  SExchangeLogicNode* pExchange = NULL;
  int32_t             code = splCreateExchangeNode(pCxt, pInfo->pSplitNode, &pExchange);
  if (TSDB_CODE_SUCCESS == code) {
    code = replaceLogicNode(pInfo->pSubplan, pInfo->pSplitNode, (SLogicNode*)pExchange);
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
                                     (SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
  }
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
  ++(pCxt->groupId);
  return code;
}

static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
623 624
  if (stbSplIsPartTableWinodw((SWindowLogicNode*)pInfo->pSplitNode)) {
    return stbSplSplitWindowForPartTable(pCxt, pInfo);
X
Xiaoyu Wang 已提交
625
  } else {
626
    return stbSplSplitWindowForCrossTable(pCxt, pInfo);
X
Xiaoyu Wang 已提交
627 628 629
  }
}

X
Xiaoyu Wang 已提交
630 631 632 633 634 635 636 637 638
static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pOutput) {
  SNodeList* pFunc = pMergeAgg->pAggFuncs;
  pMergeAgg->pAggFuncs = NULL;
  SNodeList* pGroupKeys = pMergeAgg->pGroupKeys;
  pMergeAgg->pGroupKeys = NULL;
  SNodeList* pTargets = pMergeAgg->node.pTargets;
  pMergeAgg->node.pTargets = NULL;
  SNodeList* pChildren = pMergeAgg->node.pChildren;
  pMergeAgg->node.pChildren = NULL;
X
Xiaoyu Wang 已提交
639 640
  SNode* pConditions = pMergeAgg->node.pConditions;
  pMergeAgg->node.pConditions = NULL;
X
Xiaoyu Wang 已提交
641 642

  int32_t        code = TSDB_CODE_SUCCESS;
643
  SAggLogicNode* pPartAgg = (SAggLogicNode*)nodesCloneNode((SNode*)pMergeAgg);
X
Xiaoyu Wang 已提交
644 645 646 647 648 649
  if (NULL == pPartAgg) {
    code = TSDB_CODE_OUT_OF_MEMORY;
  }

  if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) {
    pPartAgg->pGroupKeys = pGroupKeys;
X
Xiaoyu Wang 已提交
650
    code = createColumnByRewriteExprs(pPartAgg->pGroupKeys, &pPartAgg->node.pTargets);
X
Xiaoyu Wang 已提交
651 652 653 654 655 656 657 658
  }
  if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) {
    pMergeAgg->pGroupKeys = nodesCloneList(pPartAgg->node.pTargets);
    if (NULL == pMergeAgg->pGroupKeys) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  }
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
659
    pMergeAgg->node.pConditions = pConditions;
X
Xiaoyu Wang 已提交
660 661
    pMergeAgg->node.pTargets = pTargets;
    pPartAgg->node.pChildren = pChildren;
X
Xiaoyu Wang 已提交
662
    splSetParent((SLogicNode*)pPartAgg);
X
Xiaoyu Wang 已提交
663 664 665 666

    code = stbSplRewriteFuns(pFunc, &pPartAgg->pAggFuncs, &pMergeAgg->pAggFuncs);
  }
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
667
    code = createColumnByRewriteExprs(pPartAgg->pAggFuncs, &pPartAgg->node.pTargets);
X
Xiaoyu Wang 已提交
668 669 670 671 672 673
  }

  nodesDestroyList(pFunc);
  if (TSDB_CODE_SUCCESS == code) {
    *pOutput = (SLogicNode*)pPartAgg;
  } else {
674
    nodesDestroyNode((SNode*)pPartAgg);
X
Xiaoyu Wang 已提交
675 676 677 678 679 680 681 682 683 684 685 686 687
  }

  return code;
}

static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
  SLogicNode* pPartAgg = NULL;
  int32_t     code = stbSplCreatePartAggNode((SAggLogicNode*)pInfo->pSplitNode, &pPartAgg);
  if (TSDB_CODE_SUCCESS == code) {
    code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartAgg);
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
688
                                     (SNode*)splCreateScanSubplan(pCxt, pPartAgg, SPLIT_FLAG_STABLE_SPLIT));
X
Xiaoyu Wang 已提交
689 690
  }
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
691
  ++(pCxt->groupId);
X
Xiaoyu Wang 已提交
692 693 694
  return code;
}

X
Xiaoyu Wang 已提交
695
static SNode* stbSplCreateColumnNode(SExprNode* pExpr) {
696
  SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
X
Xiaoyu Wang 已提交
697 698 699 700 701 702 703 704 705 706 707 708 709
  if (NULL == pCol) {
    return NULL;
  }
  if (QUERY_NODE_COLUMN == nodeType(pExpr)) {
    strcpy(pCol->tableAlias, ((SColumnNode*)pExpr)->tableAlias);
  }
  strcpy(pCol->colName, pExpr->aliasName);
  strcpy(pCol->node.aliasName, pExpr->aliasName);
  pCol->node.resType = pExpr->resType;
  return (SNode*)pCol;
}

static SNode* stbSplCreateOrderByExpr(SOrderByExprNode* pSortKey, SNode* pCol) {
710
  SOrderByExprNode* pOutput = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
X
Xiaoyu Wang 已提交
711 712 713 714 715
  if (NULL == pOutput) {
    return NULL;
  }
  pOutput->pExpr = nodesCloneNode(pCol);
  if (NULL == pOutput->pExpr) {
716
    nodesDestroyNode((SNode*)pOutput);
X
Xiaoyu Wang 已提交
717 718 719 720 721 722 723 724 725 726 727 728 729
    return NULL;
  }
  pOutput->order = pSortKey->order;
  pOutput->nullOrder = pSortKey->nullOrder;
  return (SNode*)pOutput;
}

static int32_t stbSplCreateMergeKeys(SNodeList* pSortKeys, SNodeList* pTargets, SNodeList** pOutput) {
  int32_t    code = TSDB_CODE_SUCCESS;
  SNodeList* pMergeKeys = NULL;
  SNode*     pNode = NULL;
  FOREACH(pNode, pSortKeys) {
    SOrderByExprNode* pSortKey = (SOrderByExprNode*)pNode;
730
    SExprNode*        pSortExpr = (SExprNode*)pSortKey->pExpr;
X
Xiaoyu Wang 已提交
731 732 733
    SNode*            pTarget = NULL;
    bool              found = false;
    FOREACH(pTarget, pTargets) {
734 735
      if ((QUERY_NODE_COLUMN == nodeType(pSortExpr) && nodesEqualNode((SNode*)pSortExpr, pTarget)) ||
          (0 == strcmp(pSortExpr->aliasName, ((SColumnNode*)pTarget)->colName))) {
X
Xiaoyu Wang 已提交
736 737 738 739 740 741 742 743
        code = nodesListMakeStrictAppend(&pMergeKeys, stbSplCreateOrderByExpr(pSortKey, pTarget));
        if (TSDB_CODE_SUCCESS != code) {
          break;
        }
        found = true;
      }
    }
    if (TSDB_CODE_SUCCESS == code && !found) {
744
      SNode* pCol = stbSplCreateColumnNode(pSortExpr);
X
Xiaoyu Wang 已提交
745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769
      code = nodesListMakeStrictAppend(&pMergeKeys, stbSplCreateOrderByExpr(pSortKey, pCol));
      if (TSDB_CODE_SUCCESS == code) {
        code = nodesListStrictAppend(pTargets, pCol);
      } else {
        nodesDestroyNode(pCol);
      }
    }
    if (TSDB_CODE_SUCCESS != code) {
      break;
    }
  }
  if (TSDB_CODE_SUCCESS == code) {
    *pOutput = pMergeKeys;
  } else {
    nodesDestroyList(pMergeKeys);
  }
  return code;
}

static int32_t stbSplCreatePartSortNode(SSortLogicNode* pSort, SLogicNode** pOutputPartSort,
                                        SNodeList** pOutputMergeKeys) {
  SNodeList* pSortKeys = pSort->pSortKeys;
  pSort->pSortKeys = NULL;
  SNodeList* pChildren = pSort->node.pChildren;
  pSort->node.pChildren = NULL;
X
Xiaoyu Wang 已提交
770 771

  int32_t         code = TSDB_CODE_SUCCESS;
772
  SSortLogicNode* pPartSort = (SSortLogicNode*)nodesCloneNode((SNode*)pSort);
X
Xiaoyu Wang 已提交
773 774 775 776
  if (NULL == pPartSort) {
    code = TSDB_CODE_OUT_OF_MEMORY;
  }

X
Xiaoyu Wang 已提交
777
  SNodeList* pMergeKeys = NULL;
X
Xiaoyu Wang 已提交
778
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
779
    pPartSort->node.pChildren = pChildren;
X
Xiaoyu Wang 已提交
780
    splSetParent((SLogicNode*)pPartSort);
X
Xiaoyu Wang 已提交
781
    pPartSort->pSortKeys = pSortKeys;
782
    pPartSort->groupSort = pSort->groupSort;
X
Xiaoyu Wang 已提交
783
    code = stbSplCreateMergeKeys(pPartSort->pSortKeys, pPartSort->node.pTargets, &pMergeKeys);
X
Xiaoyu Wang 已提交
784 785 786
  }

  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
787 788
    *pOutputPartSort = (SLogicNode*)pPartSort;
    *pOutputMergeKeys = pMergeKeys;
X
Xiaoyu Wang 已提交
789
  } else {
790
    nodesDestroyNode((SNode*)pPartSort);
X
Xiaoyu Wang 已提交
791
    nodesDestroyList(pMergeKeys);
X
Xiaoyu Wang 已提交
792 793 794 795 796
  }

  return code;
}

797 798 799 800 801 802 803 804 805 806 807 808 809
static void stbSplSetScanPartSort(SLogicNode* pNode) {
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
    SScanLogicNode* pScan = (SScanLogicNode*)pNode;
    if (NULL != pScan->pGroupTags) {
      pScan->groupSort = true;
    }
  } else {
    if (1 == LIST_LENGTH(pNode->pChildren)) {
      stbSplSetScanPartSort((SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
    }
  }
}

X
Xiaoyu Wang 已提交
810 811
static int32_t stbSplSplitSortNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
  SLogicNode* pPartSort = NULL;
X
Xiaoyu Wang 已提交
812
  SNodeList*  pMergeKeys = NULL;
813
  bool        groupSort = ((SSortLogicNode*)pInfo->pSplitNode)->groupSort;
X
Xiaoyu Wang 已提交
814
  int32_t     code = stbSplCreatePartSortNode((SSortLogicNode*)pInfo->pSplitNode, &pPartSort, &pMergeKeys);
X
Xiaoyu Wang 已提交
815
  if (TSDB_CODE_SUCCESS == code) {
816 817 818 819
    code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, pMergeKeys, pPartSort, groupSort);
  }
  if (TSDB_CODE_SUCCESS == code && groupSort) {
    stbSplSetScanPartSort(pPartSort);
X
Xiaoyu Wang 已提交
820 821 822
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
823
                                     (SNode*)splCreateScanSubplan(pCxt, pPartSort, SPLIT_FLAG_STABLE_SPLIT));
X
Xiaoyu Wang 已提交
824 825
  }
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
826
  ++(pCxt->groupId);
X
Xiaoyu Wang 已提交
827 828 829
  return code;
}

X
Xiaoyu Wang 已提交
830
static int32_t stbSplSplitScanNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
831
  int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pInfo->pSplitNode, SUBPLAN_TYPE_MERGE);
X
Xiaoyu Wang 已提交
832 833
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
834
                                     (SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
X
Xiaoyu Wang 已提交
835
  }
836
  ++(pCxt->groupId);
X
Xiaoyu Wang 已提交
837 838 839
  return code;
}

X
Xiaoyu Wang 已提交
840 841 842 843 844 845 846 847 848 849 850 851 852 853
static SNode* stbSplFindPrimaryKeyFromScan(SScanLogicNode* pScan) {
  SNode* pCol = NULL;
  FOREACH(pCol, pScan->pScanCols) {
    if (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pCol)->colId) {
      return pCol;
    }
  }
  return NULL;
}

static int32_t stbSplSplitScanNodeForJoin(SSplitContext* pCxt, SLogicSubplan* pSubplan, SScanLogicNode* pScan) {
  SNodeList* pMergeKeys = NULL;
  int32_t    code = stbSplCreateMergeKeysByPrimaryKey(stbSplFindPrimaryKeyFromScan(pScan), &pMergeKeys);
  if (TSDB_CODE_SUCCESS == code) {
854
    code = stbSplCreateMergeNode(pCxt, pSubplan, (SLogicNode*)pScan, pMergeKeys, (SLogicNode*)pScan, false);
X
Xiaoyu Wang 已提交
855 856 857 858 859
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pSubplan->pChildren,
                                     (SNode*)splCreateScanSubplan(pCxt, (SLogicNode*)pScan, SPLIT_FLAG_STABLE_SPLIT));
  }
X
Xiaoyu Wang 已提交
860 861
  pScan->scanType = SCAN_TYPE_TABLE_MERGE;
  ++(pCxt->groupId);
X
Xiaoyu Wang 已提交
862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882
  return code;
}

static int32_t stbSplSplitJoinNodeImpl(SSplitContext* pCxt, SLogicSubplan* pSubplan, SJoinLogicNode* pJoin) {
  int32_t code = TSDB_CODE_SUCCESS;
  SNode*  pChild = NULL;
  FOREACH(pChild, pJoin->node.pChildren) {
    if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild)) {
      code = stbSplSplitScanNodeForJoin(pCxt, pSubplan, (SScanLogicNode*)pChild);
    } else if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pChild)) {
      code = stbSplSplitJoinNodeImpl(pCxt, pSubplan, (SJoinLogicNode*)pChild);
    } else {
      code = TSDB_CODE_PLAN_INTERNAL_ERROR;
    }
    if (TSDB_CODE_SUCCESS != code) {
      break;
    }
  }
  return code;
}

X
Xiaoyu Wang 已提交
883
static int32_t stbSplSplitJoinNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
X
Xiaoyu Wang 已提交
884 885 886 887 888 889
  int32_t code = stbSplSplitJoinNodeImpl(pCxt, pInfo->pSubplan, (SJoinLogicNode*)pInfo->pSplitNode);
  if (TSDB_CODE_SUCCESS == code) {
    pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
    SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT);
  }
  return code;
X
Xiaoyu Wang 已提交
890 891
}

892 893 894 895 896 897 898 899 900 901
static int32_t stbSplSplitPartitionNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
  int32_t code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, NULL, pInfo->pSplitNode, true);
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
                                     (SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
  }
  ++(pCxt->groupId);
  return code;
}

X
Xiaoyu Wang 已提交
902
static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
903 904 905 906
  if (pCxt->pPlanCxt->rSmaQuery) {
    return TSDB_CODE_SUCCESS;
  }

X
Xiaoyu Wang 已提交
907 908
  SStableSplitInfo info = {0};
  if (!splMatch(pCxt, pSubplan, SPLIT_FLAG_STABLE_SPLIT, (FSplFindSplitNode)stbSplFindSplitNode, &info)) {
909 910
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
911 912 913

  int32_t code = TSDB_CODE_SUCCESS;
  switch (nodeType(info.pSplitNode)) {
X
Xiaoyu Wang 已提交
914 915 916 917 918 919
    case QUERY_NODE_LOGIC_PLAN_SCAN:
      code = stbSplSplitScanNode(pCxt, &info);
      break;
    case QUERY_NODE_LOGIC_PLAN_JOIN:
      code = stbSplSplitJoinNode(pCxt, &info);
      break;
920 921 922
    case QUERY_NODE_LOGIC_PLAN_PARTITION:
      code = stbSplSplitPartitionNode(pCxt, &info);
      break;
X
Xiaoyu Wang 已提交
923 924 925
    case QUERY_NODE_LOGIC_PLAN_AGG:
      code = stbSplSplitAggNode(pCxt, &info);
      break;
X
Xiaoyu Wang 已提交
926 927 928
    case QUERY_NODE_LOGIC_PLAN_WINDOW:
      code = stbSplSplitWindowNode(pCxt, &info);
      break;
X
Xiaoyu Wang 已提交
929 930 931
    case QUERY_NODE_LOGIC_PLAN_SORT:
      code = stbSplSplitSortNode(pCxt, &info);
      break;
X
Xiaoyu Wang 已提交
932 933
    default:
      break;
934
  }
X
Xiaoyu Wang 已提交
935

936 937 938 939
  pCxt->split = true;
  return code;
}

X
Xiaoyu Wang 已提交
940 941 942 943 944 945
typedef struct SSigTbJoinSplitInfo {
  SJoinLogicNode* pJoin;
  SLogicNode*     pSplitNode;
  SLogicSubplan*  pSubplan;
} SSigTbJoinSplitInfo;

X
Xiaoyu Wang 已提交
946 947 948 949 950 951
static bool sigTbJoinSplNeedSplit(SLogicNode* pNode) {
  if (QUERY_NODE_LOGIC_PLAN_JOIN != nodeType(pNode)) {
    return false;
  }

  SJoinLogicNode* pJoin = (SJoinLogicNode*)pNode;
X
Xiaoyu Wang 已提交
952 953 954 955 956
  if (!pJoin->isSingleTableJoin) {
    return false;
  }
  return QUERY_NODE_LOGIC_PLAN_EXCHANGE != nodeType(nodesListGetNode(pJoin->node.pChildren, 0)) &&
         QUERY_NODE_LOGIC_PLAN_EXCHANGE != nodeType(nodesListGetNode(pJoin->node.pChildren, 1));
957 958
}

X
Xiaoyu Wang 已提交
959 960 961 962 963
static bool sigTbJoinSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
                                      SSigTbJoinSplitInfo* pInfo) {
  if (sigTbJoinSplNeedSplit(pNode)) {
    pInfo->pJoin = (SJoinLogicNode*)pNode;
    pInfo->pSplitNode = (SLogicNode*)nodesListGetNode(pNode->pChildren, 1);
964
    pInfo->pSubplan = pSubplan;
X
Xiaoyu Wang 已提交
965
    return true;
966
  }
X
Xiaoyu Wang 已提交
967
  return false;
968 969
}

X
Xiaoyu Wang 已提交
970 971 972
static int32_t singleTableJoinSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
  SSigTbJoinSplitInfo info = {0};
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)sigTbJoinSplFindSplitNode, &info)) {
X
Xiaoyu Wang 已提交
973 974
    return TSDB_CODE_SUCCESS;
  }
975
  int32_t code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pSplitNode, info.pSubplan->subplanType);
X
Xiaoyu Wang 已提交
976
  if (TSDB_CODE_SUCCESS == code) {
977
    code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, info.pSplitNode, 0));
X
Xiaoyu Wang 已提交
978 979 980 981 982 983
  }
  ++(pCxt->groupId);
  pCxt->split = true;
  return code;
}

984 985 986 987 988
static bool unionIsChildSubplan(SLogicNode* pLogicNode, int32_t groupId) {
  if (QUERY_NODE_LOGIC_PLAN_EXCHANGE == nodeType(pLogicNode)) {
    return ((SExchangeLogicNode*)pLogicNode)->srcGroupId == groupId;
  }

989 990 991 992
  if (QUERY_NODE_LOGIC_PLAN_MERGE == nodeType(pLogicNode)) {
    return ((SMergeLogicNode*)pLogicNode)->srcGroupId == groupId;
  }

993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020
  SNode* pChild;
  FOREACH(pChild, pLogicNode->pChildren) {
    bool isChild = unionIsChildSubplan((SLogicNode*)pChild, groupId);
    if (isChild) {
      return isChild;
    }
  }
  return false;
}

static int32_t unionMountSubplan(SLogicSubplan* pParent, SNodeList* pChildren) {
  SNode* pChild = NULL;
  WHERE_EACH(pChild, pChildren) {
    if (unionIsChildSubplan(pParent->pNode, ((SLogicSubplan*)pChild)->id.groupId)) {
      int32_t code = nodesListMakeAppend(&pParent->pChildren, pChild);
      if (TSDB_CODE_SUCCESS == code) {
        REPLACE_NODE(NULL);
        ERASE_NODE(pChildren);
        continue;
      } else {
        return code;
      }
    }
    WHERE_NEXT;
  }
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1021
static SLogicSubplan* unionCreateSubplan(SSplitContext* pCxt, SLogicNode* pNode, ESubplanType subplanType) {
1022
  SLogicSubplan* pSubplan = (SLogicSubplan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
1023 1024 1025
  if (NULL == pSubplan) {
    return NULL;
  }
1026
  pSubplan->id.queryId = pCxt->queryId;
1027
  pSubplan->id.groupId = pCxt->groupId;
D
dapan1121 已提交
1028
  pSubplan->subplanType = subplanType;
1029
  pSubplan->pNode = pNode;
1030
  pNode->pParent = NULL;
1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041
  return pSubplan;
}

static int32_t unionSplitSubplan(SSplitContext* pCxt, SLogicSubplan* pUnionSubplan, SLogicNode* pSplitNode) {
  SNodeList* pSubplanChildren = pUnionSubplan->pChildren;
  pUnionSubplan->pChildren = NULL;

  int32_t code = TSDB_CODE_SUCCESS;

  SNode* pChild = NULL;
  FOREACH(pChild, pSplitNode->pChildren) {
D
dapan1121 已提交
1042
    SLogicSubplan* pNewSubplan = unionCreateSubplan(pCxt, (SLogicNode*)pChild, pUnionSubplan->subplanType);
1043
    code = nodesListMakeStrictAppend(&pUnionSubplan->pChildren, (SNode*)pNewSubplan);
1044 1045 1046 1047 1048 1049 1050 1051 1052 1053
    if (TSDB_CODE_SUCCESS == code) {
      REPLACE_NODE(NULL);
      code = unionMountSubplan(pNewSubplan, pSubplanChildren);
    }
    if (TSDB_CODE_SUCCESS != code) {
      break;
    }
  }
  if (TSDB_CODE_SUCCESS == code) {
    nodesDestroyList(pSubplanChildren);
X
Xiaoyu Wang 已提交
1054
    NODES_DESTORY_LIST(pSplitNode->pChildren);
1055 1056 1057 1058
  }
  return code;
}

X
Xiaoyu Wang 已提交
1059 1060 1061 1062 1063
typedef struct SUnionAllSplitInfo {
  SProjectLogicNode* pProject;
  SLogicSubplan*     pSubplan;
} SUnionAllSplitInfo;

X
Xiaoyu Wang 已提交
1064 1065
static bool unAllSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
                                  SUnionAllSplitInfo* pInfo) {
X
Xiaoyu Wang 已提交
1066
  if (QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
X
Xiaoyu Wang 已提交
1067
    pInfo->pProject = (SProjectLogicNode*)pNode;
X
Xiaoyu Wang 已提交
1068
    pInfo->pSubplan = pSubplan;
X
Xiaoyu Wang 已提交
1069
    return true;
X
Xiaoyu Wang 已提交
1070
  }
X
Xiaoyu Wang 已提交
1071
  return false;
X
Xiaoyu Wang 已提交
1072 1073
}

X
Xiaoyu Wang 已提交
1074
static int32_t unAllSplCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SProjectLogicNode* pProject) {
1075
  SExchangeLogicNode* pExchange = (SExchangeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE);
X
Xiaoyu Wang 已提交
1076 1077 1078 1079
  if (NULL == pExchange) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  pExchange->srcGroupId = pCxt->groupId;
X
Xiaoyu Wang 已提交
1080
  pExchange->node.precision = pProject->node.precision;
X
Xiaoyu Wang 已提交
1081 1082 1083 1084
  pExchange->node.pTargets = nodesCloneList(pProject->node.pTargets);
  if (NULL == pExchange->node.pTargets) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
1085
  TSWAP(pExchange->node.pLimit, pProject->node.pLimit);
X
Xiaoyu Wang 已提交
1086 1087 1088

  pSubplan->subplanType = SUBPLAN_TYPE_MERGE;

X
Xiaoyu Wang 已提交
1089 1090
  if (NULL == pProject->node.pParent) {
    pSubplan->pNode = (SLogicNode*)pExchange;
1091
    nodesDestroyNode((SNode*)pProject);
X
Xiaoyu Wang 已提交
1092 1093 1094 1095 1096
    return TSDB_CODE_SUCCESS;
  }

  SNode* pNode;
  FOREACH(pNode, pProject->node.pParent->pChildren) {
1097
    if (nodesEqualNode(pNode, (SNode*)pProject)) {
X
Xiaoyu Wang 已提交
1098 1099 1100 1101 1102
      REPLACE_NODE(pExchange);
      nodesDestroyNode(pNode);
      return TSDB_CODE_SUCCESS;
    }
  }
1103 1104
  nodesDestroyNode((SNode*)pExchange);
  return TSDB_CODE_PLAN_INTERNAL_ERROR;
X
Xiaoyu Wang 已提交
1105 1106
}

X
Xiaoyu Wang 已提交
1107 1108
static int32_t unionAllSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
  SUnionAllSplitInfo info = {0};
X
Xiaoyu Wang 已提交
1109
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)unAllSplFindSplitNode, &info)) {
1110 1111
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
1112

1113
  int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pProject);
X
Xiaoyu Wang 已提交
1114
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
1115
    code = unAllSplCreateExchangeNode(pCxt, info.pSubplan, info.pProject);
X
Xiaoyu Wang 已提交
1116 1117
  }
  ++(pCxt->groupId);
1118
  pCxt->split = true;
X
Xiaoyu Wang 已提交
1119 1120 1121
  return code;
}

X
Xiaoyu Wang 已提交
1122 1123 1124 1125 1126
typedef struct SUnionDistinctSplitInfo {
  SAggLogicNode* pAgg;
  SLogicSubplan* pSubplan;
} SUnionDistinctSplitInfo;

X
Xiaoyu Wang 已提交
1127
static int32_t unDistSplCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SAggLogicNode* pAgg) {
1128
  SExchangeLogicNode* pExchange = (SExchangeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE);
X
Xiaoyu Wang 已提交
1129 1130 1131 1132
  if (NULL == pExchange) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  pExchange->srcGroupId = pCxt->groupId;
X
Xiaoyu Wang 已提交
1133
  pExchange->node.precision = pAgg->node.precision;
X
Xiaoyu Wang 已提交
1134
  pExchange->node.pTargets = nodesCloneList(pAgg->pGroupKeys);
X
Xiaoyu Wang 已提交
1135 1136 1137 1138 1139 1140
  if (NULL == pExchange->node.pTargets) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  pSubplan->subplanType = SUBPLAN_TYPE_MERGE;

1141
  return nodesListMakeAppend(&pAgg->node.pChildren, (SNode*)pExchange);
X
Xiaoyu Wang 已提交
1142 1143
}

X
Xiaoyu Wang 已提交
1144 1145 1146 1147
static bool unDistSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
                                   SUnionDistinctSplitInfo* pInfo) {
  if (QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
    pInfo->pAgg = (SAggLogicNode*)pNode;
X
Xiaoyu Wang 已提交
1148
    pInfo->pSubplan = pSubplan;
X
Xiaoyu Wang 已提交
1149
    return true;
X
Xiaoyu Wang 已提交
1150
  }
X
Xiaoyu Wang 已提交
1151
  return false;
X
Xiaoyu Wang 已提交
1152 1153
}

X
Xiaoyu Wang 已提交
1154 1155
static int32_t unionDistinctSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
  SUnionDistinctSplitInfo info = {0};
X
Xiaoyu Wang 已提交
1156
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)unDistSplFindSplitNode, &info)) {
X
Xiaoyu Wang 已提交
1157 1158 1159
    return TSDB_CODE_SUCCESS;
  }

1160
  int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pAgg);
X
Xiaoyu Wang 已提交
1161
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
1162
    code = unDistSplCreateExchangeNode(pCxt, info.pSubplan, info.pAgg);
X
Xiaoyu Wang 已提交
1163 1164 1165 1166 1167 1168
  }
  ++(pCxt->groupId);
  pCxt->split = true;
  return code;
}

X
Xiaoyu Wang 已提交
1169 1170 1171 1172 1173
typedef struct SSmaIndexSplitInfo {
  SMergeLogicNode* pMerge;
  SLogicSubplan*   pSubplan;
} SSmaIndexSplitInfo;

X
Xiaoyu Wang 已提交
1174 1175
static bool smaIdxSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
                                   SSmaIndexSplitInfo* pInfo) {
X
Xiaoyu Wang 已提交
1176
  if (QUERY_NODE_LOGIC_PLAN_MERGE == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
X
Xiaoyu Wang 已提交
1177
    pInfo->pMerge = (SMergeLogicNode*)pNode;
X
Xiaoyu Wang 已提交
1178
    pInfo->pSubplan = pSubplan;
X
Xiaoyu Wang 已提交
1179
    return true;
X
Xiaoyu Wang 已提交
1180
  }
X
Xiaoyu Wang 已提交
1181
  return false;
X
Xiaoyu Wang 已提交
1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198
}

static int32_t smaIndexSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
  SSmaIndexSplitInfo info = {0};
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)smaIdxSplFindSplitNode, &info)) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pMerge);
  if (TSDB_CODE_SUCCESS == code) {
    info.pMerge->srcGroupId = pCxt->groupId;
  }
  ++(pCxt->groupId);
  pCxt->split = true;
  return code;
}

X
Xiaoyu Wang 已提交
1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222
typedef struct SQnodeSplitInfo {
  SLogicNode*    pSplitNode;
  SLogicSubplan* pSubplan;
} SQnodeSplitInfo;

static bool qndSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
                                SQnodeSplitInfo* pInfo) {
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) && NULL != pNode->pParent) {
    pInfo->pSplitNode = pNode;
    pInfo->pSubplan = pSubplan;
    return true;
  }
  return false;
}

static int32_t qnodeSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
  if (QUERY_POLICY_QNODE != tsQueryPolicy) {
    return TSDB_CODE_SUCCESS;
  }

  SQnodeSplitInfo info = {0};
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)qndSplFindSplitNode, &info)) {
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
1223
  ((SScanLogicNode*)info.pSplitNode)->dataRequired = FUNC_DATA_REQUIRED_DATA_LOAD;
X
Xiaoyu Wang 已提交
1224 1225
  int32_t code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pSplitNode, info.pSubplan->subplanType);
  if (TSDB_CODE_SUCCESS == code) {
1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237
    SLogicSubplan* pScanSubplan = splCreateScanSubplan(pCxt, info.pSplitNode, 0);
    if (NULL != pScanSubplan) {
      if (NULL != info.pSubplan->pVgroupList) {
        info.pSubplan->numOfComputeNodes = info.pSubplan->pVgroupList->numOfVgroups;
        TSWAP(pScanSubplan->pVgroupList, info.pSubplan->pVgroupList);
      } else {
        info.pSubplan->numOfComputeNodes = 1;
      }
      code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)pScanSubplan);
    } else {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
1238
  }
1239
  info.pSubplan->subplanType = SUBPLAN_TYPE_COMPUTE;
X
Xiaoyu Wang 已提交
1240 1241 1242 1243 1244
  ++(pCxt->groupId);
  pCxt->split = true;
  return code;
}

X
Xiaoyu Wang 已提交
1245 1246 1247 1248 1249
// clang-format off
static const SSplitRule splitRuleSet[] = {
  {.pName = "SuperTableSplit",      .splitFunc = stableSplit},
  {.pName = "SingleTableJoinSplit", .splitFunc = singleTableJoinSplit},
  {.pName = "UnionAllSplit",        .splitFunc = unionAllSplit},
X
Xiaoyu Wang 已提交
1250
  {.pName = "UnionDistinctSplit",   .splitFunc = unionDistinctSplit},
1251
  {.pName = "SmaIndexSplit",        .splitFunc = smaIndexSplit}
X
Xiaoyu Wang 已提交
1252 1253
};
// clang-format on
X
Xiaoyu Wang 已提交
1254 1255 1256

static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule));

1257 1258
static void dumpLogicSubplan(const char* pRuleName, SLogicSubplan* pSubplan) {
  char* pStr = NULL;
1259
  nodesNodeToString((SNode*)pSubplan, false, &pStr, NULL);
1260
  qDebugL("apply split %s rule: %s", pRuleName, pStr);
1261 1262 1263
  taosMemoryFree(pStr);
}

1264 1265 1266 1267
static int32_t applySplitRule(SPlanContext* pCxt, SLogicSubplan* pSubplan) {
  SSplitContext cxt = {
      .pPlanCxt = pCxt, .queryId = pSubplan->id.queryId, .groupId = pSubplan->id.groupId + 1, .split = false};
  bool split = false;
X
Xiaoyu Wang 已提交
1268
  do {
1269
    split = false;
X
Xiaoyu Wang 已提交
1270
    for (int32_t i = 0; i < splitRuleNum; ++i) {
1271
      cxt.split = false;
1272
      int32_t code = splitRuleSet[i].splitFunc(&cxt, pSubplan);
X
Xiaoyu Wang 已提交
1273 1274 1275
      if (TSDB_CODE_SUCCESS != code) {
        return code;
      }
1276 1277 1278 1279
      if (cxt.split) {
        split = true;
        dumpLogicSubplan(splitRuleSet[i].pName, pSubplan);
      }
X
Xiaoyu Wang 已提交
1280
    }
1281
  } while (split);
1282
  return qnodeSplit(&cxt, pSubplan);
X
Xiaoyu Wang 已提交
1283
}
X
Xiaoyu Wang 已提交
1284

X
Xiaoyu Wang 已提交
1285 1286 1287 1288 1289 1290 1291 1292 1293 1294
static void setVgroupsInfo(SLogicNode* pNode, SLogicSubplan* pSubplan) {
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
    TSWAP(((SScanLogicNode*)pNode)->pVgroupList, pSubplan->pVgroupList);
    return;
  }

  SNode* pChild;
  FOREACH(pChild, pNode->pChildren) { setVgroupsInfo((SLogicNode*)pChild, pSubplan); }
}

X
Xiaoyu Wang 已提交
1295 1296 1297 1298
int32_t splitLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) {
  if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY == nodeType(pLogicSubplan->pNode)) {
    setVgroupsInfo(pLogicSubplan->pNode, pLogicSubplan);
    return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
1299
  }
X
Xiaoyu Wang 已提交
1300 1301
  return applySplitRule(pCxt, pLogicSubplan);
}