planSpliter.c 56.0 KB
Newer Older
X
Xiaoyu Wang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

X
Xiaoyu Wang 已提交
16
#include "functionMgt.h"
X
Xiaoyu Wang 已提交
17
#include "planInt.h"
X
Xiaoyu Wang 已提交
18
#include "tglobal.h"
X
Xiaoyu Wang 已提交
19

X
Xiaoyu Wang 已提交
20
#define SPLIT_FLAG_MASK(n) (1 << n)
X
Xiaoyu Wang 已提交
21

X
Xiaoyu Wang 已提交
22
#define SPLIT_FLAG_STABLE_SPLIT SPLIT_FLAG_MASK(0)
23
#define SPLIT_FLAG_INSERT_SPLIT SPLIT_FLAG_MASK(1)
X
Xiaoyu Wang 已提交
24

X
Xiaoyu Wang 已提交
25
#define SPLIT_FLAG_SET_MASK(val, mask)  (val) |= (mask)
X
Xiaoyu Wang 已提交
26 27 28
#define SPLIT_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0)

typedef struct SSplitContext {
29 30 31 32
  SPlanContext* pPlanCxt;
  uint64_t      queryId;
  int32_t       groupId;
  bool          split;
X
Xiaoyu Wang 已提交
33 34
} SSplitContext;

35
typedef int32_t (*FSplit)(SSplitContext* pCxt, SLogicSubplan* pSubplan);
X
Xiaoyu Wang 已提交
36 37

typedef struct SSplitRule {
X
Xiaoyu Wang 已提交
38
  char*  pName;
X
Xiaoyu Wang 已提交
39 40 41
  FSplit splitFunc;
} SSplitRule;

X
Xiaoyu Wang 已提交
42
typedef bool (*FSplFindSplitNode)(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode, void* pInfo);
X
Xiaoyu Wang 已提交
43

X
Xiaoyu Wang 已提交
44 45 46 47 48 49 50 51 52 53 54
static void splSetSubplanVgroups(SLogicSubplan* pSubplan, SLogicNode* pNode) {
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
    TSWAP(pSubplan->pVgroupList, ((SScanLogicNode*)pNode)->pVgroupList);
  } else {
    if (1 == LIST_LENGTH(pNode->pChildren)) {
      splSetSubplanVgroups(pSubplan, (SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
    }
  }
}

static SLogicSubplan* splCreateScanSubplan(SSplitContext* pCxt, SLogicNode* pNode, int32_t flag) {
55
  SLogicSubplan* pSubplan = (SLogicSubplan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
X
Xiaoyu Wang 已提交
56 57 58
  if (NULL == pSubplan) {
    return NULL;
  }
59
  pSubplan->id.queryId = pCxt->queryId;
X
Xiaoyu Wang 已提交
60 61
  pSubplan->id.groupId = pCxt->groupId;
  pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
X
Xiaoyu Wang 已提交
62 63 64
  pSubplan->pNode = pNode;
  pSubplan->pNode->pParent = NULL;
  splSetSubplanVgroups(pSubplan, pNode);
65
  SPLIT_FLAG_SET_MASK(pSubplan->splitFlag, flag);
X
Xiaoyu Wang 已提交
66 67 68
  return pSubplan;
}

X
Xiaoyu Wang 已提交
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
static bool splHasScan(SLogicNode* pNode) {
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
    return true;
  }

  SNode* pChild = NULL;
  FOREACH(pChild, pNode->pChildren) {
    if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild)) {
      return true;
    }
    return splHasScan((SLogicNode*)pChild);
  }

  return false;
}

static void splSetSubplanType(SLogicSubplan* pSubplan) {
  pSubplan->subplanType = splHasScan(pSubplan->pNode) ? SUBPLAN_TYPE_SCAN : SUBPLAN_TYPE_MERGE;
}

static SLogicSubplan* splCreateSubplan(SSplitContext* pCxt, SLogicNode* pNode) {
90 91 92 93 94 95 96 97
  SLogicSubplan* pSubplan = (SLogicSubplan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
  if (NULL == pSubplan) {
    return NULL;
  }
  pSubplan->id.queryId = pCxt->queryId;
  pSubplan->id.groupId = pCxt->groupId;
  pSubplan->pNode = pNode;
  pNode->pParent = NULL;
X
Xiaoyu Wang 已提交
98
  splSetSubplanType(pSubplan);
99 100 101
  return pSubplan;
}

102
static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pChild, SExchangeLogicNode** pOutput) {
103
  SExchangeLogicNode* pExchange = (SExchangeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE);
X
Xiaoyu Wang 已提交
104 105 106
  if (NULL == pExchange) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
107

108 109
  pExchange->srcStartGroupId = pCxt->groupId;
  pExchange->srcEndGroupId = pCxt->groupId;
110 111
  pExchange->node.precision = pChild->precision;
  pExchange->node.pTargets = nodesCloneList(pChild->pTargets);
X
Xiaoyu Wang 已提交
112 113 114
  if (NULL == pExchange->node.pTargets) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
115 116 117 118 119
  if (NULL != pChild->pLimit) {
    pExchange->node.pLimit = nodesCloneNode(pChild->pLimit);
    if (NULL == pExchange->node.pLimit) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
120
    ((SLimitNode*)pChild->pLimit)->limit += ((SLimitNode*)pChild->pLimit)->offset;
121 122
    ((SLimitNode*)pChild->pLimit)->offset = 0;
  }
X
Xiaoyu Wang 已提交
123

124 125 126 127
  *pOutput = pExchange;
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
128 129 130 131 132
static int32_t splCreateExchangeNodeForSubplan(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode,
                                               ESubplanType subplanType) {
  SExchangeLogicNode* pExchange = NULL;
  int32_t             code = splCreateExchangeNode(pCxt, pSplitNode, &pExchange);
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
133
    code = replaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pExchange);
X
Xiaoyu Wang 已提交
134 135 136 137
  }
  if (TSDB_CODE_SUCCESS == code) {
    pSubplan->subplanType = subplanType;
  } else {
138
    nodesDestroyNode((SNode*)pExchange);
X
Xiaoyu Wang 已提交
139 140
  }
  return code;
X
Xiaoyu Wang 已提交
141 142
}

143 144
static bool splIsChildSubplan(SLogicNode* pLogicNode, int32_t groupId) {
  if (QUERY_NODE_LOGIC_PLAN_EXCHANGE == nodeType(pLogicNode)) {
145 146
    return groupId >= ((SExchangeLogicNode*)pLogicNode)->srcStartGroupId &&
           groupId <= ((SExchangeLogicNode*)pLogicNode)->srcEndGroupId;
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
  }

  if (QUERY_NODE_LOGIC_PLAN_MERGE == nodeType(pLogicNode)) {
    return ((SMergeLogicNode*)pLogicNode)->srcGroupId == groupId;
  }

  SNode* pChild;
  FOREACH(pChild, pLogicNode->pChildren) {
    bool isChild = splIsChildSubplan((SLogicNode*)pChild, groupId);
    if (isChild) {
      return isChild;
    }
  }
  return false;
}

163 164 165
static int32_t splMountSubplan(SLogicSubplan* pParent, SNodeList* pChildren) {
  SNode* pChild = NULL;
  WHERE_EACH(pChild, pChildren) {
166
    if (splIsChildSubplan(pParent->pNode, ((SLogicSubplan*)pChild)->id.groupId)) {
167 168 169 170 171 172 173 174 175 176 177 178 179 180
      int32_t code = nodesListMakeAppend(&pParent->pChildren, pChild);
      if (TSDB_CODE_SUCCESS == code) {
        REPLACE_NODE(NULL);
        ERASE_NODE(pChildren);
        continue;
      } else {
        return code;
      }
    }
    WHERE_NEXT;
  }
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
181 182 183 184 185 186 187 188 189 190 191 192 193 194
static bool splMatchByNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode, FSplFindSplitNode func,
                           void* pInfo) {
  if (func(pCxt, pSubplan, pNode, pInfo)) {
    return true;
  }
  SNode* pChild;
  FOREACH(pChild, pNode->pChildren) {
    if (splMatchByNode(pCxt, pSubplan, (SLogicNode*)pChild, func, pInfo)) {
      return true;
    }
  }
  return NULL;
}

195 196
static bool splMatch(SSplitContext* pCxt, SLogicSubplan* pSubplan, int32_t flag, FSplFindSplitNode func, void* pInfo) {
  if (!SPLIT_FLAG_TEST_MASK(pSubplan->splitFlag, flag)) {
X
Xiaoyu Wang 已提交
197
    if (splMatchByNode(pCxt, pSubplan, pSubplan->pNode, func, pInfo)) {
198 199 200 201 202 203 204 205 206 207 208 209
      return true;
    }
  }
  SNode* pChild;
  FOREACH(pChild, pSubplan->pChildren) {
    if (splMatch(pCxt, (SLogicSubplan*)pChild, flag, func, pInfo)) {
      return true;
    }
  }
  return false;
}

X
Xiaoyu Wang 已提交
210 211 212 213 214
static void splSetParent(SLogicNode* pNode) {
  SNode* pChild = NULL;
  FOREACH(pChild, pNode->pChildren) { ((SLogicNode*)pChild)->pParent = pNode; }
}

X
Xiaoyu Wang 已提交
215
typedef struct SStableSplitInfo {
X
Xiaoyu Wang 已提交
216 217
  SLogicNode*    pSplitNode;
  SLogicSubplan* pSubplan;
X
Xiaoyu Wang 已提交
218 219
} SStableSplitInfo;

X
Xiaoyu Wang 已提交
220 221 222
static bool stbSplHasGatherExecFunc(const SNodeList* pFuncs) {
  SNode* pFunc = NULL;
  FOREACH(pFunc, pFuncs) {
223 224
    if (!fmIsWindowPseudoColumnFunc(((SFunctionNode*)pFunc)->funcId) &&
        !fmIsDistExecFunc(((SFunctionNode*)pFunc)->funcId)) {
X
Xiaoyu Wang 已提交
225 226 227 228 229 230
      return true;
    }
  }
  return false;
}

231
static bool stbSplIsMultiTbScan(bool streamQuery, SScanLogicNode* pScan) {
5
54liuyao 已提交
232
  return (NULL != pScan->pVgroupList && pScan->pVgroupList->numOfVgroups > 1);
X
Xiaoyu Wang 已提交
233 234
}

235
static bool stbSplHasMultiTbScan(bool streamQuery, SLogicNode* pNode) {
X
Xiaoyu Wang 已提交
236 237 238 239
  if (1 != LIST_LENGTH(pNode->pChildren)) {
    return false;
  }
  SNode* pChild = nodesListGetNode(pNode->pChildren, 0);
X
Xiaoyu Wang 已提交
240 241 242 243 244 245
  if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pChild)) {
    if (1 != LIST_LENGTH(((SLogicNode*)pChild)->pChildren)) {
      return false;
    }
    pChild = nodesListGetNode(((SLogicNode*)pChild)->pChildren, 0);
  }
246
  return (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pChild));
X
Xiaoyu Wang 已提交
247 248
}

249 250 251 252 253 254 255 256
static bool stbSplIsMultiTbScanChild(bool streamQuery, SLogicNode* pNode) {
  if (1 != LIST_LENGTH(pNode->pChildren)) {
    return false;
  }
  SNode* pChild = nodesListGetNode(pNode->pChildren, 0);
  return (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pChild));
}

257 258 259 260 261 262 263 264 265 266 267 268 269
static bool stbSplNeedSplitWindow(bool streamQuery, SLogicNode* pNode) {
  SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode;
  if (WINDOW_TYPE_INTERVAL == pWindow->winType) {
    return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
  }

  if (WINDOW_TYPE_SESSION == pWindow->winType) {
    if (!streamQuery) {
      return stbSplHasMultiTbScan(streamQuery, pNode);
    } else {
      return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
    }
  }
X
Xiaoyu Wang 已提交
270

271 272 273 274 275 276 277 278 279 280 281
  if (WINDOW_TYPE_STATE == pWindow->winType) {
    if (!streamQuery) {
      return stbSplHasMultiTbScan(streamQuery, pNode);
    } else {
      return false;
    }
  }

  return false;
}

282 283 284 285 286 287 288 289 290 291 292 293 294
static bool stbSplNeedSplitJoin(bool streamQuery, SJoinLogicNode* pJoin) {
  if (pJoin->isSingleTableJoin) {
    return false;
  }
  SNode* pChild = NULL;
  FOREACH(pChild, pJoin->node.pChildren) {
    if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pChild) && QUERY_NODE_LOGIC_PLAN_JOIN != nodeType(pChild)) {
      return false;
    }
  }
  return true;
}

295 296 297 298 299 300 301 302 303 304 305 306 307 308
static bool stbSplIsTableCountQuery(SLogicNode* pNode) {
  if (1 != LIST_LENGTH(pNode->pChildren)) {
    return false;
  }
  SNode* pChild = nodesListGetNode(pNode->pChildren, 0);
  if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pChild)) {
    if (1 != LIST_LENGTH(((SLogicNode*)pChild)->pChildren)) {
      return false;
    }
    pChild = nodesListGetNode(((SLogicNode*)pChild)->pChildren, 0);
  }
  return QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && SCAN_TYPE_TABLE_COUNT == ((SScanLogicNode*)pChild)->scanType;
}

X
Xiaoyu Wang 已提交
309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345
static SNodeList* stbSplGetPartKeys(SLogicNode* pNode) {
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
    return ((SScanLogicNode*)pNode)->pGroupTags;
  } else if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
    return ((SPartitionLogicNode*)pNode)->pPartitionKeys;
  } else {
    return NULL;
  }
}

static bool stbSplHasPartTbname(SNodeList* pPartKeys) {
  if (NULL == pPartKeys) {
    return false;
  }
  SNode* pPartKey = NULL;
  FOREACH(pPartKey, pPartKeys) {
    if (QUERY_NODE_GROUPING_SET == nodeType(pPartKey)) {
      pPartKey = nodesListGetNode(((SGroupingSetNode*)pPartKey)->pParameterList, 0);
    }
    if ((QUERY_NODE_FUNCTION == nodeType(pPartKey) && FUNCTION_TYPE_TBNAME == ((SFunctionNode*)pPartKey)->funcType) ||
        (QUERY_NODE_COLUMN == nodeType(pPartKey) && COLUMN_TYPE_TBNAME == ((SColumnNode*)pPartKey)->colType)) {
      return true;
    }
  }
  return false;
}

static bool stbSplIsPartTableAgg(SAggLogicNode* pAgg) {
  if (NULL != pAgg->pGroupKeys) {
    return stbSplHasPartTbname(pAgg->pGroupKeys);
  }
  if (1 != LIST_LENGTH(pAgg->node.pChildren)) {
    return false;
  }
  return stbSplHasPartTbname(stbSplGetPartKeys((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0)));
}

346
static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
X
Xiaoyu Wang 已提交
347
  switch (nodeType(pNode)) {
X
Xiaoyu Wang 已提交
348
    case QUERY_NODE_LOGIC_PLAN_SCAN:
349
      return streamQuery ? false : stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode);
X
Xiaoyu Wang 已提交
350
    case QUERY_NODE_LOGIC_PLAN_JOIN:
351
      return stbSplNeedSplitJoin(streamQuery, (SJoinLogicNode*)pNode);
352
    case QUERY_NODE_LOGIC_PLAN_PARTITION:
X
Xiaoyu Wang 已提交
353
      return streamQuery ? false : stbSplIsMultiTbScanChild(streamQuery, pNode);
X
Xiaoyu Wang 已提交
354
    case QUERY_NODE_LOGIC_PLAN_AGG:
X
Xiaoyu Wang 已提交
355 356
      return (!stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) ||
              stbSplIsPartTableAgg((SAggLogicNode*)pNode)) &&
X
Xiaoyu Wang 已提交
357
             stbSplHasMultiTbScan(streamQuery, pNode) && !stbSplIsTableCountQuery(pNode);
358 359
    case QUERY_NODE_LOGIC_PLAN_WINDOW:
      return stbSplNeedSplitWindow(streamQuery, pNode);
360 361
    case QUERY_NODE_LOGIC_PLAN_SORT:
      return stbSplHasMultiTbScan(streamQuery, pNode);
X
Xiaoyu Wang 已提交
362 363 364 365 366 367
    default:
      break;
  }
  return false;
}

X
Xiaoyu Wang 已提交
368 369 370 371
static bool stbSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
                                SStableSplitInfo* pInfo) {
  if (stbSplNeedSplit(pCxt->pPlanCxt->streamQuery, pNode)) {
    pInfo->pSplitNode = pNode;
372
    pInfo->pSubplan = pSubplan;
X
Xiaoyu Wang 已提交
373
    return true;
374
  }
X
Xiaoyu Wang 已提交
375
  return false;
376 377
}

X
Xiaoyu Wang 已提交
378 379 380 381 382 383 384 385
static int32_t stbSplRewriteFuns(const SNodeList* pFuncs, SNodeList** pPartialFuncs, SNodeList** pMergeFuncs) {
  SNode* pNode = NULL;
  FOREACH(pNode, pFuncs) {
    SFunctionNode* pFunc = (SFunctionNode*)pNode;
    SFunctionNode* pPartFunc = NULL;
    SFunctionNode* pMergeFunc = NULL;
    int32_t        code = TSDB_CODE_SUCCESS;
    if (fmIsWindowPseudoColumnFunc(pFunc->funcId)) {
386 387
      pPartFunc = (SFunctionNode*)nodesCloneNode(pNode);
      pMergeFunc = (SFunctionNode*)nodesCloneNode(pNode);
X
Xiaoyu Wang 已提交
388
      if (NULL == pPartFunc || NULL == pMergeFunc) {
389 390
        nodesDestroyNode((SNode*)pPartFunc);
        nodesDestroyNode((SNode*)pMergeFunc);
X
Xiaoyu Wang 已提交
391 392 393 394 395 396
        code = TSDB_CODE_OUT_OF_MEMORY;
      }
    } else {
      code = fmGetDistMethod(pFunc, &pPartFunc, &pMergeFunc);
    }
    if (TSDB_CODE_SUCCESS == code) {
397
      code = nodesListMakeStrictAppend(pPartialFuncs, (SNode*)pPartFunc);
X
Xiaoyu Wang 已提交
398 399
    }
    if (TSDB_CODE_SUCCESS == code) {
400
      code = nodesListMakeStrictAppend(pMergeFuncs, (SNode*)pMergeFunc);
X
Xiaoyu Wang 已提交
401 402 403 404 405 406 407 408 409 410 411 412 413 414
    }
    if (TSDB_CODE_SUCCESS != code) {
      nodesDestroyList(*pPartialFuncs);
      nodesDestroyList(*pMergeFuncs);
      return code;
    }
  }
  return TSDB_CODE_SUCCESS;
}

static int32_t stbSplAppendWStart(SNodeList* pFuncs, int32_t* pIndex) {
  int32_t index = 0;
  SNode*  pFunc = NULL;
  FOREACH(pFunc, pFuncs) {
415
    if (FUNCTION_TYPE_WSTART == ((SFunctionNode*)pFunc)->funcType) {
X
Xiaoyu Wang 已提交
416 417 418 419 420 421
      *pIndex = index;
      return TSDB_CODE_SUCCESS;
    }
    ++index;
  }

422
  SFunctionNode* pWStart = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
X
Xiaoyu Wang 已提交
423 424 425
  if (NULL == pWStart) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
426
  strcpy(pWStart->functionName, "_wstart");
X
Xiaoyu Wang 已提交
427 428
  int64_t pointer = (int64_t)pWStart;
  snprintf(pWStart->node.aliasName, sizeof(pWStart->node.aliasName), "%s.%" PRId64 "", pWStart->functionName, pointer);
X
Xiaoyu Wang 已提交
429 430
  int32_t code = fmGetFuncInfo(pWStart, NULL, 0);
  if (TSDB_CODE_SUCCESS == code) {
431
    code = nodesListStrictAppend(pFuncs, (SNode*)pWStart);
X
Xiaoyu Wang 已提交
432 433 434 435 436
  }
  *pIndex = index;
  return code;
}

5
54liuyao 已提交
437 438 439 440
static int32_t stbSplAppendWEnd(SWindowLogicNode* pWin, int32_t* pIndex) {
  int32_t index = 0;
  SNode*  pFunc = NULL;
  FOREACH(pFunc, pWin->pFuncs) {
441
    if (FUNCTION_TYPE_WEND == ((SFunctionNode*)pFunc)->funcType) {
5
54liuyao 已提交
442 443 444 445 446 447 448 449 450 451
      *pIndex = index;
      return TSDB_CODE_SUCCESS;
    }
    ++index;
  }

  SFunctionNode* pWEnd = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
  if (NULL == pWEnd) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
452
  strcpy(pWEnd->functionName, "_wend");
X
Xiaoyu Wang 已提交
453 454
  int64_t pointer = (int64_t)pWEnd;
  snprintf(pWEnd->node.aliasName, sizeof(pWEnd->node.aliasName), "%s.%" PRId64 "", pWEnd->functionName, pointer);
5
54liuyao 已提交
455 456 457 458 459 460 461 462 463 464 465
  int32_t code = fmGetFuncInfo(pWEnd, NULL, 0);
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListStrictAppend(pWin->pFuncs, (SNode*)pWEnd);
  }
  *pIndex = index;
  if (TSDB_CODE_SUCCESS == code) {
    code = createColumnByRewriteExpr(nodesListGetNode(pWin->pFuncs, index), &pWin->node.pTargets);
  }
  return code;
}

X
Xiaoyu Wang 已提交
466 467 468 469 470 471 472
static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogicNode** pPartWindow) {
  SNodeList* pFunc = pMergeWindow->pFuncs;
  pMergeWindow->pFuncs = NULL;
  SNodeList* pTargets = pMergeWindow->node.pTargets;
  pMergeWindow->node.pTargets = NULL;
  SNodeList* pChildren = pMergeWindow->node.pChildren;
  pMergeWindow->node.pChildren = NULL;
X
Xiaoyu Wang 已提交
473 474
  SNode* pConditions = pMergeWindow->node.pConditions;
  pMergeWindow->node.pConditions = NULL;
X
Xiaoyu Wang 已提交
475

476
  SWindowLogicNode* pPartWin = (SWindowLogicNode*)nodesCloneNode((SNode*)pMergeWindow);
X
Xiaoyu Wang 已提交
477
  if (NULL == pPartWin) {
X
Xiaoyu Wang 已提交
478
    return TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
479 480
  }

481
  pPartWin->node.groupAction = GROUP_ACTION_KEEP;
X
Xiaoyu Wang 已提交
482 483 484 485
  pMergeWindow->node.pTargets = pTargets;
  pMergeWindow->node.pConditions = pConditions;
  pPartWin->node.pChildren = pChildren;
  splSetParent((SLogicNode*)pPartWin);
486

X
Xiaoyu Wang 已提交
487
  int32_t index = 0;
X
Xiaoyu Wang 已提交
488
  int32_t code = stbSplRewriteFuns(pFunc, &pPartWin->pFuncs, &pMergeWindow->pFuncs);
X
Xiaoyu Wang 已提交
489 490 491 492
  if (TSDB_CODE_SUCCESS == code) {
    code = stbSplAppendWStart(pPartWin->pFuncs, &index);
  }
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
493
    code = createColumnByRewriteExprs(pPartWin->pFuncs, &pPartWin->node.pTargets);
X
Xiaoyu Wang 已提交
494 495 496 497 498 499 500 501 502 503 504 505 506
  }
  if (TSDB_CODE_SUCCESS == code) {
    nodesDestroyNode(pMergeWindow->pTspk);
    pMergeWindow->pTspk = nodesCloneNode(nodesListGetNode(pPartWin->node.pTargets, index));
    if (NULL == pMergeWindow->pTspk) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  }

  nodesDestroyList(pFunc);
  if (TSDB_CODE_SUCCESS == code) {
    *pPartWindow = (SLogicNode*)pPartWin;
  } else {
507
    nodesDestroyNode((SNode*)pPartWin);
X
Xiaoyu Wang 已提交
508 509 510 511 512
  }

  return code;
}

X
Xiaoyu Wang 已提交
513 514 515 516 517 518 519 520 521 522 523
static int32_t stbSplGetNumOfVgroups(SLogicNode* pNode) {
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
    return ((SScanLogicNode*)pNode)->pVgroupList->numOfVgroups;
  } else {
    if (1 == LIST_LENGTH(pNode->pChildren)) {
      return stbSplGetNumOfVgroups((SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
    }
  }
  return 0;
}

X
Xiaoyu Wang 已提交
524
static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode,
525
                                     SNodeList* pMergeKeys, SLogicNode* pPartChild, bool groupSort) {
526
  SMergeLogicNode* pMerge = (SMergeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE);
X
Xiaoyu Wang 已提交
527 528 529
  if (NULL == pMerge) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
530
  pMerge->numOfChannels = stbSplGetNumOfVgroups(pPartChild);
X
Xiaoyu Wang 已提交
531 532
  pMerge->srcGroupId = pCxt->groupId;
  pMerge->node.precision = pPartChild->precision;
X
Xiaoyu Wang 已提交
533
  pMerge->pMergeKeys = pMergeKeys;
534
  pMerge->groupSort = groupSort;
X
Xiaoyu Wang 已提交
535 536 537

  int32_t code = TSDB_CODE_SUCCESS;
  pMerge->pInputs = nodesCloneList(pPartChild->pTargets);
X
Xiaoyu Wang 已提交
538
  // NULL != pSubplan means 'merge node' replaces 'split node'.
539 540 541 542 543
  if (NULL == pSubplan) {
    pMerge->node.pTargets = nodesCloneList(pPartChild->pTargets);
  } else {
    pMerge->node.pTargets = nodesCloneList(pSplitNode->pTargets);
  }
X
Xiaoyu Wang 已提交
544 545 546
  if (NULL == pMerge->node.pTargets || NULL == pMerge->pInputs) {
    code = TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
547 548 549 550 551 552
  if (TSDB_CODE_SUCCESS == code && NULL != pSplitNode->pLimit) {
    pMerge->node.pLimit = nodesCloneNode(pSplitNode->pLimit);
    if (NULL == pMerge->node.pLimit) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  }
X
Xiaoyu Wang 已提交
553 554
  if (TSDB_CODE_SUCCESS == code) {
    if (NULL == pSubplan) {
555
      code = nodesListMakeAppend(&pSplitNode->pChildren, (SNode*)pMerge);
X
Xiaoyu Wang 已提交
556
    } else {
X
Xiaoyu Wang 已提交
557
      code = replaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pMerge);
X
Xiaoyu Wang 已提交
558 559 560
    }
  }
  if (TSDB_CODE_SUCCESS != code) {
561
    nodesDestroyNode((SNode*)pMerge);
X
Xiaoyu Wang 已提交
562
  }
X
Xiaoyu Wang 已提交
563
  return code;
X
Xiaoyu Wang 已提交
564 565
}

X
Xiaoyu Wang 已提交
566 567 568 569
static int32_t stbSplCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pParent, SLogicNode* pPartChild) {
  SExchangeLogicNode* pExchange = NULL;
  int32_t             code = splCreateExchangeNode(pCxt, pPartChild, &pExchange);
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
570
    pExchange->node.pParent = pParent;
571
    code = nodesListMakeAppend(&pParent->pChildren, (SNode*)pExchange);
X
Xiaoyu Wang 已提交
572 573 574 575
  }
  return code;
}

576
static int32_t stbSplCreateMergeKeysByPrimaryKey(SNode* pPrimaryKey, EOrder order, SNodeList** pMergeKeys) {
577
  SOrderByExprNode* pMergeKey = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
578 579 580
  if (NULL == pMergeKey) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
581
  pMergeKey->pExpr = nodesCloneNode(pPrimaryKey);
582
  if (NULL == pMergeKey->pExpr) {
583
    nodesDestroyNode((SNode*)pMergeKey);
584 585
    return TSDB_CODE_OUT_OF_MEMORY;
  }
586
  pMergeKey->order = order;
587
  pMergeKey->nullOrder = NULL_ORDER_FIRST;
588
  return nodesListMakeStrictAppend(pMergeKeys, (SNode*)pMergeKey);
589 590
}

591
static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
X
Xiaoyu Wang 已提交
592 593 594
  SLogicNode* pPartWindow = NULL;
  int32_t     code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
  if (TSDB_CODE_SUCCESS == code) {
595 596
    ((SWindowLogicNode*)pPartWindow)->windowAlgo = INTERVAL_ALGO_HASH;
    ((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = INTERVAL_ALGO_MERGE;
X
Xiaoyu Wang 已提交
597
    SNodeList* pMergeKeys = NULL;
598
    code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk,
X
Xiaoyu Wang 已提交
599
                                             ((SWindowLogicNode*)pInfo->pSplitNode)->outputTsOrder, &pMergeKeys);
X
Xiaoyu Wang 已提交
600
    if (TSDB_CODE_SUCCESS == code) {
601
      code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow, true);
X
Xiaoyu Wang 已提交
602 603 604 605
    }
    if (TSDB_CODE_SUCCESS != code) {
      nodesDestroyList(pMergeKeys);
    }
X
Xiaoyu Wang 已提交
606 607 608
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
609
                                     (SNode*)splCreateScanSubplan(pCxt, pPartWindow, SPLIT_FLAG_STABLE_SPLIT));
X
Xiaoyu Wang 已提交
610 611
  }
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
612
  ++(pCxt->groupId);
X
Xiaoyu Wang 已提交
613 614 615
  return code;
}

616
static int32_t stbSplSplitIntervalForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
617 618 619
  SLogicNode* pPartWindow = NULL;
  int32_t     code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
  if (TSDB_CODE_SUCCESS == code) {
620 621
    ((SWindowLogicNode*)pPartWindow)->windowAlgo = INTERVAL_ALGO_STREAM_SEMI;
    ((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = INTERVAL_ALGO_STREAM_FINAL;
622 623 624 625
    code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartWindow);
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
626
                                     (SNode*)splCreateScanSubplan(pCxt, pPartWindow, SPLIT_FLAG_STABLE_SPLIT));
627 628
  }
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
629
  ++(pCxt->groupId);
630 631 632
  return code;
}

633
static int32_t stbSplSplitInterval(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
634
  if (pCxt->pPlanCxt->streamQuery) {
635
    return stbSplSplitIntervalForStream(pCxt, pInfo);
636
  } else {
637
    return stbSplSplitIntervalForBatch(pCxt, pInfo);
638 639 640
  }
}

641 642 643 644
static int32_t stbSplSplitSessionForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
  SLogicNode* pPartWindow = NULL;
  int32_t     code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
  if (TSDB_CODE_SUCCESS == code) {
5
54liuyao 已提交
645 646 647 648 649 650 651
    SWindowLogicNode* pPartWin = (SWindowLogicNode*)pPartWindow;
    SWindowLogicNode* pMergeWin = (SWindowLogicNode*)pInfo->pSplitNode;
    pPartWin->windowAlgo = SESSION_ALGO_STREAM_SEMI;
    pMergeWin->windowAlgo = SESSION_ALGO_STREAM_FINAL;
    int32_t index = 0;
    int32_t code = stbSplAppendWEnd(pPartWin, &index);
    if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
652
      nodesDestroyNode(pMergeWin->pTsEnd);
5
54liuyao 已提交
653 654 655 656 657
      pMergeWin->pTsEnd = nodesCloneNode(nodesListGetNode(pPartWin->node.pTargets, index));
      if (NULL == pMergeWin->pTsEnd) {
        code = TSDB_CODE_OUT_OF_MEMORY;
      }
    }
658 659 660 661 662 663 664 665 666 667 668
    code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartWindow);
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
                                     (SNode*)splCreateScanSubplan(pCxt, pPartWindow, SPLIT_FLAG_STABLE_SPLIT));
  }
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
  ++(pCxt->groupId);
  return code;
}

669
static void stbSplSetTableMergeScan(SLogicNode* pNode) {
670
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
671 672 673 674 675
    SScanLogicNode* pScan = (SScanLogicNode*)pNode;
    pScan->scanType = SCAN_TYPE_TABLE_MERGE;
    if (NULL != pScan->pGroupTags) {
      pScan->groupSort = true;
    }
676 677
  } else {
    if (1 == LIST_LENGTH(pNode->pChildren)) {
678
      stbSplSetTableMergeScan((SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
679 680 681 682 683 684 685 686 687
    }
  }
}

static int32_t stbSplSplitSessionOrStateForBatch(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
  SLogicNode* pWindow = pInfo->pSplitNode;
  SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pWindow->pChildren, 0);

  SNodeList* pMergeKeys = NULL;
688 689
  int32_t    code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pWindow)->pTspk,
                                                      ((SWindowLogicNode*)pWindow)->inputTsOrder, &pMergeKeys);
690 691

  if (TSDB_CODE_SUCCESS == code) {
692
    code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pChild, pMergeKeys, (SLogicNode*)pChild, true);
693 694 695 696 697 698 699 700
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
                                     (SNode*)splCreateScanSubplan(pCxt, pChild, SPLIT_FLAG_STABLE_SPLIT));
  }

  if (TSDB_CODE_SUCCESS == code) {
701
    stbSplSetTableMergeScan(pChild);
702 703
    pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
    SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT);
704
    ++(pCxt->groupId);
705 706 707 708 709 710 711
  } else {
    nodesDestroyList(pMergeKeys);
  }

  return code;
}

712
static int32_t stbSplSplitSession(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
713 714 715
  if (pCxt->pPlanCxt->streamQuery) {
    return stbSplSplitSessionForStream(pCxt, pInfo);
  } else {
716 717 718 719 720 721 722 723 724 725 726 727 728
    return stbSplSplitSessionOrStateForBatch(pCxt, pInfo);
  }
}

static int32_t stbSplSplitStateForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
  return TSDB_CODE_PLAN_INTERNAL_ERROR;
}

static int32_t stbSplSplitState(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
  if (pCxt->pPlanCxt->streamQuery) {
    return stbSplSplitStateForStream(pCxt, pInfo);
  } else {
    return stbSplSplitSessionOrStateForBatch(pCxt, pInfo);
729
  }
730 731
}

732
static bool stbSplIsPartTableWinodw(SWindowLogicNode* pWindow) {
X
Xiaoyu Wang 已提交
733
  return stbSplHasPartTbname(stbSplGetPartKeys((SLogicNode*)nodesListGetNode(pWindow->node.pChildren, 0)));
X
Xiaoyu Wang 已提交
734 735
}

736
static int32_t stbSplSplitWindowForCrossTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
737 738 739 740 741
  switch (((SWindowLogicNode*)pInfo->pSplitNode)->winType) {
    case WINDOW_TYPE_INTERVAL:
      return stbSplSplitInterval(pCxt, pInfo);
    case WINDOW_TYPE_SESSION:
      return stbSplSplitSession(pCxt, pInfo);
742 743
    case WINDOW_TYPE_STATE:
      return stbSplSplitState(pCxt, pInfo);
744 745 746 747 748 749
    default:
      break;
  }
  return TSDB_CODE_PLAN_INTERNAL_ERROR;
}

X
Xiaoyu Wang 已提交
750 751 752 753 754 755 756 757 758 759 760
static bool stbSplNeedSeqRecvData(SLogicNode* pNode) {
  if (NULL == pNode) {
    return false;
  }

  if (NULL != pNode->pLimit || NULL != pNode->pSlimit) {
    return true;
  }
  return stbSplNeedSeqRecvData(pNode->pParent);
}

761
static int32_t stbSplSplitWindowForPartTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
X
Xiaoyu Wang 已提交
762 763 764 765 766
  if (pCxt->pPlanCxt->streamQuery) {
    SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT);
    return TSDB_CODE_SUCCESS;
  }

767 768 769
  if (NULL != pInfo->pSplitNode->pParent && QUERY_NODE_LOGIC_PLAN_FILL == nodeType(pInfo->pSplitNode->pParent)) {
    pInfo->pSplitNode = pInfo->pSplitNode->pParent;
  }
X
Xiaoyu Wang 已提交
770 771 772 773 774 775
  SExchangeLogicNode* pExchange = NULL;
  int32_t             code = splCreateExchangeNode(pCxt, pInfo->pSplitNode, &pExchange);
  if (TSDB_CODE_SUCCESS == code) {
    code = replaceLogicNode(pInfo->pSubplan, pInfo->pSplitNode, (SLogicNode*)pExchange);
  }
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
776
    pExchange->seqRecvData = stbSplNeedSeqRecvData((SLogicNode*)pExchange);
X
Xiaoyu Wang 已提交
777 778 779 780 781 782 783 784 785
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
                                     (SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
  }
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
  ++(pCxt->groupId);
  return code;
}

static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
786 787
  if (stbSplIsPartTableWinodw((SWindowLogicNode*)pInfo->pSplitNode)) {
    return stbSplSplitWindowForPartTable(pCxt, pInfo);
X
Xiaoyu Wang 已提交
788
  } else {
789
    return stbSplSplitWindowForCrossTable(pCxt, pInfo);
X
Xiaoyu Wang 已提交
790 791 792
  }
}

X
Xiaoyu Wang 已提交
793 794 795 796 797 798 799 800 801
static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pOutput) {
  SNodeList* pFunc = pMergeAgg->pAggFuncs;
  pMergeAgg->pAggFuncs = NULL;
  SNodeList* pGroupKeys = pMergeAgg->pGroupKeys;
  pMergeAgg->pGroupKeys = NULL;
  SNodeList* pTargets = pMergeAgg->node.pTargets;
  pMergeAgg->node.pTargets = NULL;
  SNodeList* pChildren = pMergeAgg->node.pChildren;
  pMergeAgg->node.pChildren = NULL;
X
Xiaoyu Wang 已提交
802 803
  SNode* pConditions = pMergeAgg->node.pConditions;
  pMergeAgg->node.pConditions = NULL;
X
Xiaoyu Wang 已提交
804

805
  SAggLogicNode* pPartAgg = (SAggLogicNode*)nodesCloneNode((SNode*)pMergeAgg);
X
Xiaoyu Wang 已提交
806
  if (NULL == pPartAgg) {
X
Xiaoyu Wang 已提交
807
    return TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
808 809
  }

810 811
  pPartAgg->node.groupAction = GROUP_ACTION_KEEP;

X
Xiaoyu Wang 已提交
812 813 814
  int32_t code = TSDB_CODE_SUCCESS;

  if (NULL != pGroupKeys) {
X
Xiaoyu Wang 已提交
815
    pPartAgg->pGroupKeys = pGroupKeys;
X
Xiaoyu Wang 已提交
816
    code = createColumnByRewriteExprs(pPartAgg->pGroupKeys, &pPartAgg->node.pTargets);
X
Xiaoyu Wang 已提交
817 818 819 820 821 822 823 824
  }
  if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) {
    pMergeAgg->pGroupKeys = nodesCloneList(pPartAgg->node.pTargets);
    if (NULL == pMergeAgg->pGroupKeys) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  }
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
825
    pMergeAgg->node.pConditions = pConditions;
X
Xiaoyu Wang 已提交
826 827
    pMergeAgg->node.pTargets = pTargets;
    pPartAgg->node.pChildren = pChildren;
X
Xiaoyu Wang 已提交
828
    splSetParent((SLogicNode*)pPartAgg);
X
Xiaoyu Wang 已提交
829 830 831 832

    code = stbSplRewriteFuns(pFunc, &pPartAgg->pAggFuncs, &pMergeAgg->pAggFuncs);
  }
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
833
    code = createColumnByRewriteExprs(pPartAgg->pAggFuncs, &pPartAgg->node.pTargets);
X
Xiaoyu Wang 已提交
834 835 836 837 838 839
  }

  nodesDestroyList(pFunc);
  if (TSDB_CODE_SUCCESS == code) {
    *pOutput = (SLogicNode*)pPartAgg;
  } else {
840
    nodesDestroyNode((SNode*)pPartAgg);
X
Xiaoyu Wang 已提交
841 842 843 844 845
  }

  return code;
}

X
Xiaoyu Wang 已提交
846 847 848 849 850 851 852 853 854 855 856
static int32_t stbSplSplitAggNodeForPartTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
  int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pInfo->pSplitNode, SUBPLAN_TYPE_MERGE);
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
                                     (SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
  }
  ++(pCxt->groupId);
  return code;
}

static int32_t stbSplSplitAggNodeForCrossTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
X
Xiaoyu Wang 已提交
857 858 859 860 861 862 863
  SLogicNode* pPartAgg = NULL;
  int32_t     code = stbSplCreatePartAggNode((SAggLogicNode*)pInfo->pSplitNode, &pPartAgg);
  if (TSDB_CODE_SUCCESS == code) {
    code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartAgg);
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
864
                                     (SNode*)splCreateScanSubplan(pCxt, pPartAgg, SPLIT_FLAG_STABLE_SPLIT));
X
Xiaoyu Wang 已提交
865 866
  }
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
867
  ++(pCxt->groupId);
X
Xiaoyu Wang 已提交
868 869 870
  return code;
}

X
Xiaoyu Wang 已提交
871 872 873 874 875 876 877
static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
  if (stbSplIsPartTableAgg((SAggLogicNode*)pInfo->pSplitNode)) {
    return stbSplSplitAggNodeForPartTable(pCxt, pInfo);
  }
  return stbSplSplitAggNodeForCrossTable(pCxt, pInfo);
}

X
Xiaoyu Wang 已提交
878
static SNode* stbSplCreateColumnNode(SExprNode* pExpr) {
879
  SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
X
Xiaoyu Wang 已提交
880 881 882 883
  if (NULL == pCol) {
    return NULL;
  }
  if (QUERY_NODE_COLUMN == nodeType(pExpr)) {
X
Xiaoyu Wang 已提交
884 885
    strcpy(pCol->dbName, ((SColumnNode*)pExpr)->dbName);
    strcpy(pCol->tableName, ((SColumnNode*)pExpr)->tableName);
X
Xiaoyu Wang 已提交
886
    strcpy(pCol->tableAlias, ((SColumnNode*)pExpr)->tableAlias);
887 888 889
    strcpy(pCol->colName, ((SColumnNode*)pExpr)->colName);
  } else {
    strcpy(pCol->colName, pExpr->aliasName);
X
Xiaoyu Wang 已提交
890 891 892 893 894 895 896
  }
  strcpy(pCol->node.aliasName, pExpr->aliasName);
  pCol->node.resType = pExpr->resType;
  return (SNode*)pCol;
}

static SNode* stbSplCreateOrderByExpr(SOrderByExprNode* pSortKey, SNode* pCol) {
897
  SOrderByExprNode* pOutput = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
X
Xiaoyu Wang 已提交
898 899 900 901 902
  if (NULL == pOutput) {
    return NULL;
  }
  pOutput->pExpr = nodesCloneNode(pCol);
  if (NULL == pOutput->pExpr) {
903
    nodesDestroyNode((SNode*)pOutput);
X
Xiaoyu Wang 已提交
904 905 906 907 908 909 910 911 912 913 914 915 916
    return NULL;
  }
  pOutput->order = pSortKey->order;
  pOutput->nullOrder = pSortKey->nullOrder;
  return (SNode*)pOutput;
}

static int32_t stbSplCreateMergeKeys(SNodeList* pSortKeys, SNodeList* pTargets, SNodeList** pOutput) {
  int32_t    code = TSDB_CODE_SUCCESS;
  SNodeList* pMergeKeys = NULL;
  SNode*     pNode = NULL;
  FOREACH(pNode, pSortKeys) {
    SOrderByExprNode* pSortKey = (SOrderByExprNode*)pNode;
917
    SExprNode*        pSortExpr = (SExprNode*)pSortKey->pExpr;
X
Xiaoyu Wang 已提交
918 919 920
    SNode*            pTarget = NULL;
    bool              found = false;
    FOREACH(pTarget, pTargets) {
921 922
      if ((QUERY_NODE_COLUMN == nodeType(pSortExpr) && nodesEqualNode((SNode*)pSortExpr, pTarget)) ||
          (0 == strcmp(pSortExpr->aliasName, ((SColumnNode*)pTarget)->colName))) {
X
Xiaoyu Wang 已提交
923 924 925 926 927 928 929 930
        code = nodesListMakeStrictAppend(&pMergeKeys, stbSplCreateOrderByExpr(pSortKey, pTarget));
        if (TSDB_CODE_SUCCESS != code) {
          break;
        }
        found = true;
      }
    }
    if (TSDB_CODE_SUCCESS == code && !found) {
931
      SNode* pCol = stbSplCreateColumnNode(pSortExpr);
X
Xiaoyu Wang 已提交
932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956
      code = nodesListMakeStrictAppend(&pMergeKeys, stbSplCreateOrderByExpr(pSortKey, pCol));
      if (TSDB_CODE_SUCCESS == code) {
        code = nodesListStrictAppend(pTargets, pCol);
      } else {
        nodesDestroyNode(pCol);
      }
    }
    if (TSDB_CODE_SUCCESS != code) {
      break;
    }
  }
  if (TSDB_CODE_SUCCESS == code) {
    *pOutput = pMergeKeys;
  } else {
    nodesDestroyList(pMergeKeys);
  }
  return code;
}

static int32_t stbSplCreatePartSortNode(SSortLogicNode* pSort, SLogicNode** pOutputPartSort,
                                        SNodeList** pOutputMergeKeys) {
  SNodeList* pSortKeys = pSort->pSortKeys;
  pSort->pSortKeys = NULL;
  SNodeList* pChildren = pSort->node.pChildren;
  pSort->node.pChildren = NULL;
X
Xiaoyu Wang 已提交
957 958

  int32_t         code = TSDB_CODE_SUCCESS;
959
  SSortLogicNode* pPartSort = (SSortLogicNode*)nodesCloneNode((SNode*)pSort);
X
Xiaoyu Wang 已提交
960 961 962 963
  if (NULL == pPartSort) {
    code = TSDB_CODE_OUT_OF_MEMORY;
  }

X
Xiaoyu Wang 已提交
964
  SNodeList* pMergeKeys = NULL;
X
Xiaoyu Wang 已提交
965
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
966
    pPartSort->node.pChildren = pChildren;
X
Xiaoyu Wang 已提交
967
    splSetParent((SLogicNode*)pPartSort);
X
Xiaoyu Wang 已提交
968
    pPartSort->pSortKeys = pSortKeys;
969
    pPartSort->groupSort = pSort->groupSort;
X
Xiaoyu Wang 已提交
970
    code = stbSplCreateMergeKeys(pPartSort->pSortKeys, pPartSort->node.pTargets, &pMergeKeys);
X
Xiaoyu Wang 已提交
971 972 973
  }

  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
974 975
    *pOutputPartSort = (SLogicNode*)pPartSort;
    *pOutputMergeKeys = pMergeKeys;
X
Xiaoyu Wang 已提交
976
  } else {
977
    nodesDestroyNode((SNode*)pPartSort);
X
Xiaoyu Wang 已提交
978
    nodesDestroyList(pMergeKeys);
X
Xiaoyu Wang 已提交
979 980 981 982 983
  }

  return code;
}

984 985 986 987 988 989 990 991 992 993 994 995 996
static void stbSplSetScanPartSort(SLogicNode* pNode) {
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
    SScanLogicNode* pScan = (SScanLogicNode*)pNode;
    if (NULL != pScan->pGroupTags) {
      pScan->groupSort = true;
    }
  } else {
    if (1 == LIST_LENGTH(pNode->pChildren)) {
      stbSplSetScanPartSort((SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
    }
  }
}

X
Xiaoyu Wang 已提交
997 998
static int32_t stbSplSplitSortNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
  SLogicNode* pPartSort = NULL;
X
Xiaoyu Wang 已提交
999
  SNodeList*  pMergeKeys = NULL;
1000
  bool        groupSort = ((SSortLogicNode*)pInfo->pSplitNode)->groupSort;
X
Xiaoyu Wang 已提交
1001
  int32_t     code = stbSplCreatePartSortNode((SSortLogicNode*)pInfo->pSplitNode, &pPartSort, &pMergeKeys);
X
Xiaoyu Wang 已提交
1002
  if (TSDB_CODE_SUCCESS == code) {
1003 1004
    code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, pMergeKeys, pPartSort, groupSort);
  }
X
Xiaoyu Wang 已提交
1005
  if (TSDB_CODE_SUCCESS == code) {
1006 1007 1008 1009
    nodesDestroyNode((SNode*)pInfo->pSplitNode);
    if (groupSort) {
      stbSplSetScanPartSort(pPartSort);
    }
X
Xiaoyu Wang 已提交
1010
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
1011
                                     (SNode*)splCreateScanSubplan(pCxt, pPartSort, SPLIT_FLAG_STABLE_SPLIT));
X
Xiaoyu Wang 已提交
1012 1013
  }
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
1014
  ++(pCxt->groupId);
X
Xiaoyu Wang 已提交
1015 1016 1017
  return code;
}

1018
static int32_t stbSplSplitScanNodeWithoutPartTags(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
1019
  SLogicNode* pSplitNode = pInfo->pSplitNode;
1020 1021
  if (NULL != pInfo->pSplitNode->pParent && QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pInfo->pSplitNode->pParent) &&
      NULL == pInfo->pSplitNode->pParent->pLimit && NULL == pInfo->pSplitNode->pParent->pSlimit) {
1022
    pSplitNode = pInfo->pSplitNode->pParent;
1023 1024 1025 1026 1027
    if (NULL != pInfo->pSplitNode->pLimit) {
      pSplitNode->pLimit = nodesCloneNode(pInfo->pSplitNode->pLimit);
      if (NULL == pSplitNode->pLimit) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
X
Xiaoyu Wang 已提交
1028
      ((SLimitNode*)pInfo->pSplitNode->pLimit)->limit += ((SLimitNode*)pInfo->pSplitNode->pLimit)->offset;
1029 1030
      ((SLimitNode*)pInfo->pSplitNode->pLimit)->offset = 0;
    }
1031 1032
  }
  int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pSplitNode, SUBPLAN_TYPE_MERGE);
X
Xiaoyu Wang 已提交
1033 1034
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
1035
                                     (SNode*)splCreateScanSubplan(pCxt, pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
X
Xiaoyu Wang 已提交
1036
  }
1037
  ++(pCxt->groupId);
X
Xiaoyu Wang 已提交
1038 1039 1040
  return code;
}

1041
static int32_t stbSplSplitScanNodeWithPartTags(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
1042
  SLogicNode* pSplitNode = pInfo->pSplitNode;
1043 1044
  if (NULL != pInfo->pSplitNode->pParent && QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pInfo->pSplitNode->pParent) &&
      NULL == pInfo->pSplitNode->pParent->pLimit && NULL == pInfo->pSplitNode->pParent->pSlimit) {
1045 1046 1047
    pSplitNode = pInfo->pSplitNode->pParent;
  }
  int32_t code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pSplitNode, NULL, pSplitNode, true);
1048 1049
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
1050
                                     (SNode*)splCreateScanSubplan(pCxt, pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
1051 1052 1053 1054 1055 1056
  }
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
  ++(pCxt->groupId);
  return code;
}

X
Xiaoyu Wang 已提交
1057
static SNode* stbSplFindPrimaryKeyFromScan(SScanLogicNode* pScan) {
1058
  bool   find = false;
X
Xiaoyu Wang 已提交
1059 1060 1061
  SNode* pCol = NULL;
  FOREACH(pCol, pScan->pScanCols) {
    if (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pCol)->colId) {
1062 1063 1064 1065 1066 1067 1068 1069 1070 1071
      find = true;
      break;
    }
  }
  if (!find) {
    return NULL;
  }
  SNode* pTarget = NULL;
  FOREACH(pTarget, pScan->node.pTargets) {
    if (nodesEqualNode(pTarget, pCol)) {
X
Xiaoyu Wang 已提交
1072 1073 1074
      return pCol;
    }
  }
1075 1076
  nodesListStrictAppend(pScan->node.pTargets, nodesCloneNode(pCol));
  return pCol;
X
Xiaoyu Wang 已提交
1077 1078
}

1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094
static int32_t stbSplCreateMergeScanNode(SScanLogicNode* pScan, SLogicNode** pOutputMergeScan,
                                         SNodeList** pOutputMergeKeys) {
  SNodeList* pChildren = pScan->node.pChildren;
  pScan->node.pChildren = NULL;

  int32_t         code = TSDB_CODE_SUCCESS;
  SScanLogicNode* pMergeScan = (SScanLogicNode*)nodesCloneNode((SNode*)pScan);
  if (NULL == pMergeScan) {
    code = TSDB_CODE_OUT_OF_MEMORY;
  }

  SNodeList* pMergeKeys = NULL;
  if (TSDB_CODE_SUCCESS == code) {
    pMergeScan->scanType = SCAN_TYPE_TABLE_MERGE;
    pMergeScan->node.pChildren = pChildren;
    splSetParent((SLogicNode*)pMergeScan);
1095 1096
    code = stbSplCreateMergeKeysByPrimaryKey(stbSplFindPrimaryKeyFromScan(pMergeScan),
                                             pMergeScan->scanSeq[0] > 0 ? ORDER_ASC : ORDER_DESC, &pMergeKeys);
1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pOutputMergeScan = (SLogicNode*)pMergeScan;
    *pOutputMergeKeys = pMergeKeys;
  } else {
    nodesDestroyNode((SNode*)pMergeScan);
    nodesDestroyList(pMergeKeys);
  }

  return code;
}

1110 1111
static int32_t stbSplSplitMergeScanNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SScanLogicNode* pScan,
                                        bool groupSort) {
1112 1113 1114
  SLogicNode* pMergeScan = NULL;
  SNodeList*  pMergeKeys = NULL;
  int32_t     code = stbSplCreateMergeScanNode(pScan, &pMergeScan, &pMergeKeys);
X
Xiaoyu Wang 已提交
1115
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
1116 1117 1118 1119
    if (NULL != pMergeScan->pLimit) {
      ((SLimitNode*)pMergeScan->pLimit)->limit += ((SLimitNode*)pMergeScan->pLimit)->offset;
      ((SLimitNode*)pMergeScan->pLimit)->offset = 0;
    }
1120
    code = stbSplCreateMergeNode(pCxt, pSubplan, (SLogicNode*)pScan, pMergeKeys, pMergeScan, groupSort);
X
Xiaoyu Wang 已提交
1121 1122
  }
  if (TSDB_CODE_SUCCESS == code) {
1123
    nodesDestroyNode((SNode*)pScan);
X
Xiaoyu Wang 已提交
1124
    code = nodesListMakeStrictAppend(&pSubplan->pChildren,
1125
                                     (SNode*)splCreateScanSubplan(pCxt, pMergeScan, SPLIT_FLAG_STABLE_SPLIT));
X
Xiaoyu Wang 已提交
1126
  }
X
Xiaoyu Wang 已提交
1127
  ++(pCxt->groupId);
X
Xiaoyu Wang 已提交
1128 1129 1130
  return code;
}

1131 1132 1133 1134
static int32_t stbSplSplitScanNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
  SScanLogicNode* pScan = (SScanLogicNode*)pInfo->pSplitNode;
  if (SCAN_TYPE_TABLE_MERGE == pScan->scanType) {
    pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
1135
    return stbSplSplitMergeScanNode(pCxt, pInfo->pSubplan, pScan, true);
1136 1137 1138 1139 1140 1141 1142
  }
  if (NULL != pScan->pGroupTags) {
    return stbSplSplitScanNodeWithPartTags(pCxt, pInfo);
  }
  return stbSplSplitScanNodeWithoutPartTags(pCxt, pInfo);
}

X
Xiaoyu Wang 已提交
1143 1144 1145 1146 1147
static int32_t stbSplSplitJoinNodeImpl(SSplitContext* pCxt, SLogicSubplan* pSubplan, SJoinLogicNode* pJoin) {
  int32_t code = TSDB_CODE_SUCCESS;
  SNode*  pChild = NULL;
  FOREACH(pChild, pJoin->node.pChildren) {
    if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild)) {
1148
      code = stbSplSplitMergeScanNode(pCxt, pSubplan, (SScanLogicNode*)pChild, false);
X
Xiaoyu Wang 已提交
1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160
    } else if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pChild)) {
      code = stbSplSplitJoinNodeImpl(pCxt, pSubplan, (SJoinLogicNode*)pChild);
    } else {
      code = TSDB_CODE_PLAN_INTERNAL_ERROR;
    }
    if (TSDB_CODE_SUCCESS != code) {
      break;
    }
  }
  return code;
}

X
Xiaoyu Wang 已提交
1161
static int32_t stbSplSplitJoinNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
X
Xiaoyu Wang 已提交
1162 1163 1164 1165 1166 1167
  int32_t code = stbSplSplitJoinNodeImpl(pCxt, pInfo->pSubplan, (SJoinLogicNode*)pInfo->pSplitNode);
  if (TSDB_CODE_SUCCESS == code) {
    pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
    SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT);
  }
  return code;
X
Xiaoyu Wang 已提交
1168 1169
}

1170
static int32_t stbSplCreateMergeKeysForPartitionNode(SLogicNode* pPart, SNodeList** pMergeKeys) {
1171 1172
  SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pPart->pChildren, 0);
  SNode*          pPrimaryKey = nodesCloneNode(stbSplFindPrimaryKeyFromScan(pScan));
1173 1174 1175 1176 1177
  if (NULL == pPrimaryKey) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  int32_t code = nodesListAppend(pPart->pTargets, pPrimaryKey);
  if (TSDB_CODE_SUCCESS == code) {
1178
    code = stbSplCreateMergeKeysByPrimaryKey(pPrimaryKey, pScan->scanSeq[0] > 0 ? ORDER_ASC : ORDER_DESC, pMergeKeys);
1179 1180 1181 1182
  }
  return code;
}

1183
static int32_t stbSplSplitPartitionNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
1184 1185 1186
  int32_t    code = TSDB_CODE_SUCCESS;
  SNodeList* pMergeKeys = NULL;
  if (pInfo->pSplitNode->requireDataOrder >= DATA_ORDER_LEVEL_IN_GROUP) {
1187
    code = stbSplCreateMergeKeysForPartitionNode(pInfo->pSplitNode, &pMergeKeys);
1188 1189 1190 1191
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, pMergeKeys, pInfo->pSplitNode, true);
  }
1192 1193 1194 1195
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
                                     (SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
  }
1196
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
1197 1198 1199 1200
  ++(pCxt->groupId);
  return code;
}

X
Xiaoyu Wang 已提交
1201
static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
1202 1203 1204 1205
  if (pCxt->pPlanCxt->rSmaQuery) {
    return TSDB_CODE_SUCCESS;
  }

X
Xiaoyu Wang 已提交
1206 1207
  SStableSplitInfo info = {0};
  if (!splMatch(pCxt, pSubplan, SPLIT_FLAG_STABLE_SPLIT, (FSplFindSplitNode)stbSplFindSplitNode, &info)) {
1208 1209
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
1210 1211 1212

  int32_t code = TSDB_CODE_SUCCESS;
  switch (nodeType(info.pSplitNode)) {
X
Xiaoyu Wang 已提交
1213 1214 1215 1216 1217 1218
    case QUERY_NODE_LOGIC_PLAN_SCAN:
      code = stbSplSplitScanNode(pCxt, &info);
      break;
    case QUERY_NODE_LOGIC_PLAN_JOIN:
      code = stbSplSplitJoinNode(pCxt, &info);
      break;
1219 1220 1221
    case QUERY_NODE_LOGIC_PLAN_PARTITION:
      code = stbSplSplitPartitionNode(pCxt, &info);
      break;
X
Xiaoyu Wang 已提交
1222 1223 1224
    case QUERY_NODE_LOGIC_PLAN_AGG:
      code = stbSplSplitAggNode(pCxt, &info);
      break;
X
Xiaoyu Wang 已提交
1225 1226 1227
    case QUERY_NODE_LOGIC_PLAN_WINDOW:
      code = stbSplSplitWindowNode(pCxt, &info);
      break;
X
Xiaoyu Wang 已提交
1228 1229 1230
    case QUERY_NODE_LOGIC_PLAN_SORT:
      code = stbSplSplitSortNode(pCxt, &info);
      break;
X
Xiaoyu Wang 已提交
1231 1232
    default:
      break;
1233
  }
X
Xiaoyu Wang 已提交
1234

1235 1236 1237 1238
  pCxt->split = true;
  return code;
}

X
Xiaoyu Wang 已提交
1239 1240 1241 1242 1243 1244
typedef struct SSigTbJoinSplitInfo {
  SJoinLogicNode* pJoin;
  SLogicNode*     pSplitNode;
  SLogicSubplan*  pSubplan;
} SSigTbJoinSplitInfo;

X
Xiaoyu Wang 已提交
1245 1246 1247 1248 1249 1250
static bool sigTbJoinSplNeedSplit(SLogicNode* pNode) {
  if (QUERY_NODE_LOGIC_PLAN_JOIN != nodeType(pNode)) {
    return false;
  }

  SJoinLogicNode* pJoin = (SJoinLogicNode*)pNode;
X
Xiaoyu Wang 已提交
1251 1252 1253 1254 1255
  if (!pJoin->isSingleTableJoin) {
    return false;
  }
  return QUERY_NODE_LOGIC_PLAN_EXCHANGE != nodeType(nodesListGetNode(pJoin->node.pChildren, 0)) &&
         QUERY_NODE_LOGIC_PLAN_EXCHANGE != nodeType(nodesListGetNode(pJoin->node.pChildren, 1));
1256 1257
}

X
Xiaoyu Wang 已提交
1258 1259 1260 1261 1262
static bool sigTbJoinSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
                                      SSigTbJoinSplitInfo* pInfo) {
  if (sigTbJoinSplNeedSplit(pNode)) {
    pInfo->pJoin = (SJoinLogicNode*)pNode;
    pInfo->pSplitNode = (SLogicNode*)nodesListGetNode(pNode->pChildren, 1);
1263
    pInfo->pSubplan = pSubplan;
X
Xiaoyu Wang 已提交
1264
    return true;
1265
  }
X
Xiaoyu Wang 已提交
1266
  return false;
1267 1268
}

X
Xiaoyu Wang 已提交
1269 1270 1271
static int32_t singleTableJoinSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
  SSigTbJoinSplitInfo info = {0};
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)sigTbJoinSplFindSplitNode, &info)) {
X
Xiaoyu Wang 已提交
1272 1273
    return TSDB_CODE_SUCCESS;
  }
1274
  int32_t code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pSplitNode, info.pSubplan->subplanType);
X
Xiaoyu Wang 已提交
1275
  if (TSDB_CODE_SUCCESS == code) {
1276
    code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, info.pSplitNode, 0));
X
Xiaoyu Wang 已提交
1277 1278 1279 1280 1281 1282
  }
  ++(pCxt->groupId);
  pCxt->split = true;
  return code;
}

1283 1284 1285 1286 1287 1288 1289 1290
static int32_t unionSplitSubplan(SSplitContext* pCxt, SLogicSubplan* pUnionSubplan, SLogicNode* pSplitNode) {
  SNodeList* pSubplanChildren = pUnionSubplan->pChildren;
  pUnionSubplan->pChildren = NULL;

  int32_t code = TSDB_CODE_SUCCESS;

  SNode* pChild = NULL;
  FOREACH(pChild, pSplitNode->pChildren) {
X
Xiaoyu Wang 已提交
1291
    SLogicSubplan* pNewSubplan = splCreateSubplan(pCxt, (SLogicNode*)pChild);
1292
    code = nodesListMakeStrictAppend(&pUnionSubplan->pChildren, (SNode*)pNewSubplan);
1293 1294
    if (TSDB_CODE_SUCCESS == code) {
      REPLACE_NODE(NULL);
1295
      code = splMountSubplan(pNewSubplan, pSubplanChildren);
1296 1297 1298 1299
    }
    if (TSDB_CODE_SUCCESS != code) {
      break;
    }
1300
    ++(pCxt->groupId);
1301 1302 1303
  }
  if (TSDB_CODE_SUCCESS == code) {
    nodesDestroyList(pSubplanChildren);
X
Xiaoyu Wang 已提交
1304
    NODES_DESTORY_LIST(pSplitNode->pChildren);
1305 1306 1307 1308
  }
  return code;
}

X
Xiaoyu Wang 已提交
1309 1310 1311 1312 1313
typedef struct SUnionAllSplitInfo {
  SProjectLogicNode* pProject;
  SLogicSubplan*     pSubplan;
} SUnionAllSplitInfo;

X
Xiaoyu Wang 已提交
1314 1315
static bool unAllSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
                                  SUnionAllSplitInfo* pInfo) {
X
Xiaoyu Wang 已提交
1316
  if (QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
X
Xiaoyu Wang 已提交
1317
    pInfo->pProject = (SProjectLogicNode*)pNode;
X
Xiaoyu Wang 已提交
1318
    pInfo->pSubplan = pSubplan;
X
Xiaoyu Wang 已提交
1319
    return true;
X
Xiaoyu Wang 已提交
1320
  }
X
Xiaoyu Wang 已提交
1321
  return false;
X
Xiaoyu Wang 已提交
1322 1323
}

1324 1325
static int32_t unAllSplCreateExchangeNode(SSplitContext* pCxt, int32_t startGroupId, SLogicSubplan* pSubplan,
                                          SProjectLogicNode* pProject) {
1326
  SExchangeLogicNode* pExchange = (SExchangeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE);
X
Xiaoyu Wang 已提交
1327 1328 1329
  if (NULL == pExchange) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
1330 1331
  pExchange->srcStartGroupId = startGroupId;
  pExchange->srcEndGroupId = pCxt->groupId - 1;
X
Xiaoyu Wang 已提交
1332
  pExchange->node.precision = pProject->node.precision;
X
Xiaoyu Wang 已提交
1333 1334 1335 1336
  pExchange->node.pTargets = nodesCloneList(pProject->node.pTargets);
  if (NULL == pExchange->node.pTargets) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
1337
  TSWAP(pExchange->node.pLimit, pProject->node.pLimit);
X
Xiaoyu Wang 已提交
1338 1339 1340

  pSubplan->subplanType = SUBPLAN_TYPE_MERGE;

X
Xiaoyu Wang 已提交
1341 1342
  if (NULL == pProject->node.pParent) {
    pSubplan->pNode = (SLogicNode*)pExchange;
1343
    nodesDestroyNode((SNode*)pProject);
X
Xiaoyu Wang 已提交
1344 1345 1346 1347 1348
    return TSDB_CODE_SUCCESS;
  }

  SNode* pNode;
  FOREACH(pNode, pProject->node.pParent->pChildren) {
1349
    if (nodesEqualNode(pNode, (SNode*)pProject)) {
X
Xiaoyu Wang 已提交
1350 1351 1352 1353 1354
      REPLACE_NODE(pExchange);
      nodesDestroyNode(pNode);
      return TSDB_CODE_SUCCESS;
    }
  }
1355 1356
  nodesDestroyNode((SNode*)pExchange);
  return TSDB_CODE_PLAN_INTERNAL_ERROR;
X
Xiaoyu Wang 已提交
1357 1358
}

X
Xiaoyu Wang 已提交
1359 1360
static int32_t unionAllSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
  SUnionAllSplitInfo info = {0};
X
Xiaoyu Wang 已提交
1361
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)unAllSplFindSplitNode, &info)) {
1362 1363
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
1364

1365
  int32_t startGroupId = pCxt->groupId;
1366
  int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pProject);
X
Xiaoyu Wang 已提交
1367
  if (TSDB_CODE_SUCCESS == code) {
1368
    code = unAllSplCreateExchangeNode(pCxt, startGroupId, info.pSubplan, info.pProject);
X
Xiaoyu Wang 已提交
1369
  }
1370
  pCxt->split = true;
X
Xiaoyu Wang 已提交
1371 1372 1373
  return code;
}

X
Xiaoyu Wang 已提交
1374 1375 1376 1377 1378
typedef struct SUnionDistinctSplitInfo {
  SAggLogicNode* pAgg;
  SLogicSubplan* pSubplan;
} SUnionDistinctSplitInfo;

1379 1380
static int32_t unDistSplCreateExchangeNode(SSplitContext* pCxt, int32_t startGroupId, SLogicSubplan* pSubplan,
                                           SAggLogicNode* pAgg) {
1381
  SExchangeLogicNode* pExchange = (SExchangeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE);
X
Xiaoyu Wang 已提交
1382 1383 1384
  if (NULL == pExchange) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
1385 1386
  pExchange->srcStartGroupId = startGroupId;
  pExchange->srcEndGroupId = pCxt->groupId - 1;
X
Xiaoyu Wang 已提交
1387
  pExchange->node.precision = pAgg->node.precision;
X
Xiaoyu Wang 已提交
1388
  pExchange->node.pTargets = nodesCloneList(pAgg->pGroupKeys);
X
Xiaoyu Wang 已提交
1389 1390 1391 1392 1393 1394
  if (NULL == pExchange->node.pTargets) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  pSubplan->subplanType = SUBPLAN_TYPE_MERGE;

1395
  return nodesListMakeAppend(&pAgg->node.pChildren, (SNode*)pExchange);
X
Xiaoyu Wang 已提交
1396 1397
}

X
Xiaoyu Wang 已提交
1398 1399 1400 1401
static bool unDistSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
                                   SUnionDistinctSplitInfo* pInfo) {
  if (QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
    pInfo->pAgg = (SAggLogicNode*)pNode;
X
Xiaoyu Wang 已提交
1402
    pInfo->pSubplan = pSubplan;
X
Xiaoyu Wang 已提交
1403
    return true;
X
Xiaoyu Wang 已提交
1404
  }
X
Xiaoyu Wang 已提交
1405
  return false;
X
Xiaoyu Wang 已提交
1406 1407
}

X
Xiaoyu Wang 已提交
1408 1409
static int32_t unionDistinctSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
  SUnionDistinctSplitInfo info = {0};
X
Xiaoyu Wang 已提交
1410
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)unDistSplFindSplitNode, &info)) {
X
Xiaoyu Wang 已提交
1411 1412 1413
    return TSDB_CODE_SUCCESS;
  }

1414
  int32_t startGroupId = pCxt->groupId;
1415
  int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pAgg);
X
Xiaoyu Wang 已提交
1416
  if (TSDB_CODE_SUCCESS == code) {
1417
    code = unDistSplCreateExchangeNode(pCxt, startGroupId, info.pSubplan, info.pAgg);
X
Xiaoyu Wang 已提交
1418 1419 1420 1421 1422
  }
  pCxt->split = true;
  return code;
}

X
Xiaoyu Wang 已提交
1423 1424 1425 1426 1427
typedef struct SSmaIndexSplitInfo {
  SMergeLogicNode* pMerge;
  SLogicSubplan*   pSubplan;
} SSmaIndexSplitInfo;

X
Xiaoyu Wang 已提交
1428 1429
static bool smaIdxSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
                                   SSmaIndexSplitInfo* pInfo) {
X
Xiaoyu Wang 已提交
1430
  if (QUERY_NODE_LOGIC_PLAN_MERGE == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
X
Xiaoyu Wang 已提交
1431
    pInfo->pMerge = (SMergeLogicNode*)pNode;
X
Xiaoyu Wang 已提交
1432
    pInfo->pSubplan = pSubplan;
X
Xiaoyu Wang 已提交
1433
    return true;
X
Xiaoyu Wang 已提交
1434
  }
X
Xiaoyu Wang 已提交
1435
  return false;
X
Xiaoyu Wang 已提交
1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452
}

static int32_t smaIndexSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
  SSmaIndexSplitInfo info = {0};
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)smaIdxSplFindSplitNode, &info)) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pMerge);
  if (TSDB_CODE_SUCCESS == code) {
    info.pMerge->srcGroupId = pCxt->groupId;
  }
  ++(pCxt->groupId);
  pCxt->split = true;
  return code;
}

1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474
typedef struct SInsertSelectSplitInfo {
  SLogicNode*    pQueryRoot;
  SLogicSubplan* pSubplan;
} SInsertSelectSplitInfo;

static bool insSelSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
                                   SInsertSelectSplitInfo* pInfo) {
  if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY == nodeType(pNode) && 1 == LIST_LENGTH(pNode->pChildren) &&
      MODIFY_TABLE_TYPE_INSERT == ((SVnodeModifyLogicNode*)pNode)->modifyType) {
    pInfo->pQueryRoot = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0);
    pInfo->pSubplan = pSubplan;
    return true;
  }
  return false;
}

static int32_t insertSelectSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
  SInsertSelectSplitInfo info = {0};
  if (!splMatch(pCxt, pSubplan, SPLIT_FLAG_INSERT_SPLIT, (FSplFindSplitNode)insSelSplFindSplitNode, &info)) {
    return TSDB_CODE_SUCCESS;
  }

1475 1476
  SLogicSubplan* pNewSubplan = NULL;
  SNodeList*     pSubplanChildren = info.pSubplan->pChildren;
1477
  int32_t        code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pQueryRoot, SUBPLAN_TYPE_MODIFY);
1478
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
1479
    pNewSubplan = splCreateSubplan(pCxt, info.pQueryRoot);
1480 1481 1482 1483 1484 1485
    if (NULL == pNewSubplan) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)pNewSubplan);
1486
  }
1487
  if (TSDB_CODE_SUCCESS == code) {
1488
    code = splMountSubplan(pNewSubplan, pSubplanChildren);
1489 1490
  }

1491
  SPLIT_FLAG_SET_MASK(info.pSubplan->splitFlag, SPLIT_FLAG_INSERT_SPLIT);
1492 1493 1494 1495 1496
  ++(pCxt->groupId);
  pCxt->split = true;
  return code;
}

X
Xiaoyu Wang 已提交
1497 1498 1499 1500 1501 1502 1503
typedef struct SQnodeSplitInfo {
  SLogicNode*    pSplitNode;
  SLogicSubplan* pSubplan;
} SQnodeSplitInfo;

static bool qndSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
                                SQnodeSplitInfo* pInfo) {
1504
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) && NULL != pNode->pParent &&
X
Xiaoyu Wang 已提交
1505 1506
      QUERY_NODE_LOGIC_PLAN_INTERP_FUNC != nodeType(pNode->pParent) && ((SScanLogicNode*)pNode)->scanSeq[0] <= 1 &&
      ((SScanLogicNode*)pNode)->scanSeq[1] <= 1) {
X
Xiaoyu Wang 已提交
1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522
    pInfo->pSplitNode = pNode;
    pInfo->pSubplan = pSubplan;
    return true;
  }
  return false;
}

static int32_t qnodeSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
  if (QUERY_POLICY_QNODE != tsQueryPolicy) {
    return TSDB_CODE_SUCCESS;
  }

  SQnodeSplitInfo info = {0};
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)qndSplFindSplitNode, &info)) {
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
1523
  ((SScanLogicNode*)info.pSplitNode)->dataRequired = FUNC_DATA_REQUIRED_DATA_LOAD;
X
Xiaoyu Wang 已提交
1524 1525
  int32_t code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pSplitNode, info.pSubplan->subplanType);
  if (TSDB_CODE_SUCCESS == code) {
1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537
    SLogicSubplan* pScanSubplan = splCreateScanSubplan(pCxt, info.pSplitNode, 0);
    if (NULL != pScanSubplan) {
      if (NULL != info.pSubplan->pVgroupList) {
        info.pSubplan->numOfComputeNodes = info.pSubplan->pVgroupList->numOfVgroups;
        TSWAP(pScanSubplan->pVgroupList, info.pSubplan->pVgroupList);
      } else {
        info.pSubplan->numOfComputeNodes = 1;
      }
      code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)pScanSubplan);
    } else {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
1538
  }
1539
  info.pSubplan->subplanType = SUBPLAN_TYPE_COMPUTE;
X
Xiaoyu Wang 已提交
1540 1541 1542 1543 1544
  ++(pCxt->groupId);
  pCxt->split = true;
  return code;
}

X
Xiaoyu Wang 已提交
1545 1546 1547 1548 1549
// clang-format off
static const SSplitRule splitRuleSet[] = {
  {.pName = "SuperTableSplit",      .splitFunc = stableSplit},
  {.pName = "SingleTableJoinSplit", .splitFunc = singleTableJoinSplit},
  {.pName = "UnionAllSplit",        .splitFunc = unionAllSplit},
X
Xiaoyu Wang 已提交
1550
  {.pName = "UnionDistinctSplit",   .splitFunc = unionDistinctSplit},
1551
  {.pName = "SmaIndexSplit",        .splitFunc = smaIndexSplit}, // not used yet
1552
  {.pName = "InsertSelectSplit",    .splitFunc = insertSelectSplit}
X
Xiaoyu Wang 已提交
1553 1554
};
// clang-format on
X
Xiaoyu Wang 已提交
1555 1556 1557

static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule));

1558
static void dumpLogicSubplan(const char* pRuleName, SLogicSubplan* pSubplan) {
1559
  if (!tsQueryPlannerTrace) {
1560 1561
    return;
  }
1562
  char* pStr = NULL;
1563
  nodesNodeToString((SNode*)pSubplan, false, &pStr, NULL);
1564 1565 1566 1567 1568
  if (NULL == pRuleName) {
    qDebugL("before split: %s", pStr);
  } else {
    qDebugL("apply split %s rule: %s", pRuleName, pStr);
  }
1569 1570 1571
  taosMemoryFree(pStr);
}

1572 1573 1574 1575
static int32_t applySplitRule(SPlanContext* pCxt, SLogicSubplan* pSubplan) {
  SSplitContext cxt = {
      .pPlanCxt = pCxt, .queryId = pSubplan->id.queryId, .groupId = pSubplan->id.groupId + 1, .split = false};
  bool split = false;
1576
  dumpLogicSubplan(NULL, pSubplan);
X
Xiaoyu Wang 已提交
1577
  do {
1578
    split = false;
X
Xiaoyu Wang 已提交
1579
    for (int32_t i = 0; i < splitRuleNum; ++i) {
1580
      cxt.split = false;
1581
      int32_t code = splitRuleSet[i].splitFunc(&cxt, pSubplan);
X
Xiaoyu Wang 已提交
1582 1583 1584
      if (TSDB_CODE_SUCCESS != code) {
        return code;
      }
1585 1586 1587 1588
      if (cxt.split) {
        split = true;
        dumpLogicSubplan(splitRuleSet[i].pName, pSubplan);
      }
X
Xiaoyu Wang 已提交
1589
    }
1590
  } while (split);
1591
  return qnodeSplit(&cxt, pSubplan);
X
Xiaoyu Wang 已提交
1592
}
X
Xiaoyu Wang 已提交
1593

X
Xiaoyu Wang 已提交
1594 1595 1596 1597 1598 1599 1600 1601 1602 1603
static void setVgroupsInfo(SLogicNode* pNode, SLogicSubplan* pSubplan) {
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
    TSWAP(((SScanLogicNode*)pNode)->pVgroupList, pSubplan->pVgroupList);
    return;
  }

  SNode* pChild;
  FOREACH(pChild, pNode->pChildren) { setVgroupsInfo((SLogicNode*)pChild, pSubplan); }
}

1604 1605 1606 1607 1608 1609 1610 1611
static bool needSplitSubplan(SLogicSubplan* pLogicSubplan) {
  if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY != nodeType(pLogicSubplan->pNode)) {
    return true;
  }
  SVnodeModifyLogicNode* pModify = (SVnodeModifyLogicNode*)pLogicSubplan->pNode;
  return (MODIFY_TABLE_TYPE_INSERT == pModify->modifyType && NULL != pModify->node.pChildren);
}

X
Xiaoyu Wang 已提交
1612
int32_t splitLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) {
1613
  if (!needSplitSubplan(pLogicSubplan)) {
X
Xiaoyu Wang 已提交
1614 1615
    setVgroupsInfo(pLogicSubplan->pNode, pLogicSubplan);
    return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
1616
  }
X
Xiaoyu Wang 已提交
1617 1618
  return applySplitRule(pCxt, pLogicSubplan);
}