planSpliter.c 54.4 KB
Newer Older
X
Xiaoyu Wang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

X
Xiaoyu Wang 已提交
16
#include "functionMgt.h"
X
Xiaoyu Wang 已提交
17
#include "planInt.h"
X
Xiaoyu Wang 已提交
18
#include "tglobal.h"
X
Xiaoyu Wang 已提交
19

X
Xiaoyu Wang 已提交
20
#define SPLIT_FLAG_MASK(n) (1 << n)
X
Xiaoyu Wang 已提交
21

X
Xiaoyu Wang 已提交
22
#define SPLIT_FLAG_STABLE_SPLIT SPLIT_FLAG_MASK(0)
23
#define SPLIT_FLAG_INSERT_SPLIT SPLIT_FLAG_MASK(1)
X
Xiaoyu Wang 已提交
24

X
Xiaoyu Wang 已提交
25
#define SPLIT_FLAG_SET_MASK(val, mask)  (val) |= (mask)
X
Xiaoyu Wang 已提交
26 27 28
#define SPLIT_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0)

typedef struct SSplitContext {
29 30 31 32
  SPlanContext* pPlanCxt;
  uint64_t      queryId;
  int32_t       groupId;
  bool          split;
X
Xiaoyu Wang 已提交
33 34
} SSplitContext;

35
typedef int32_t (*FSplit)(SSplitContext* pCxt, SLogicSubplan* pSubplan);
X
Xiaoyu Wang 已提交
36 37

typedef struct SSplitRule {
X
Xiaoyu Wang 已提交
38
  char*  pName;
X
Xiaoyu Wang 已提交
39 40 41
  FSplit splitFunc;
} SSplitRule;

X
Xiaoyu Wang 已提交
42
typedef bool (*FSplFindSplitNode)(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode, void* pInfo);
X
Xiaoyu Wang 已提交
43

X
Xiaoyu Wang 已提交
44 45 46 47 48 49 50 51 52 53 54
static void splSetSubplanVgroups(SLogicSubplan* pSubplan, SLogicNode* pNode) {
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
    TSWAP(pSubplan->pVgroupList, ((SScanLogicNode*)pNode)->pVgroupList);
  } else {
    if (1 == LIST_LENGTH(pNode->pChildren)) {
      splSetSubplanVgroups(pSubplan, (SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
    }
  }
}

static SLogicSubplan* splCreateScanSubplan(SSplitContext* pCxt, SLogicNode* pNode, int32_t flag) {
55
  SLogicSubplan* pSubplan = (SLogicSubplan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
X
Xiaoyu Wang 已提交
56 57 58
  if (NULL == pSubplan) {
    return NULL;
  }
59
  pSubplan->id.queryId = pCxt->queryId;
X
Xiaoyu Wang 已提交
60 61
  pSubplan->id.groupId = pCxt->groupId;
  pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
X
Xiaoyu Wang 已提交
62 63 64
  pSubplan->pNode = pNode;
  pSubplan->pNode->pParent = NULL;
  splSetSubplanVgroups(pSubplan, pNode);
65
  SPLIT_FLAG_SET_MASK(pSubplan->splitFlag, flag);
X
Xiaoyu Wang 已提交
66 67 68
  return pSubplan;
}

X
Xiaoyu Wang 已提交
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
static bool splHasScan(SLogicNode* pNode) {
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
    return true;
  }

  SNode* pChild = NULL;
  FOREACH(pChild, pNode->pChildren) {
    if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild)) {
      return true;
    }
    return splHasScan((SLogicNode*)pChild);
  }

  return false;
}

static void splSetSubplanType(SLogicSubplan* pSubplan) {
  pSubplan->subplanType = splHasScan(pSubplan->pNode) ? SUBPLAN_TYPE_SCAN : SUBPLAN_TYPE_MERGE;
}

static SLogicSubplan* splCreateSubplan(SSplitContext* pCxt, SLogicNode* pNode) {
90 91 92 93 94 95 96 97
  SLogicSubplan* pSubplan = (SLogicSubplan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
  if (NULL == pSubplan) {
    return NULL;
  }
  pSubplan->id.queryId = pCxt->queryId;
  pSubplan->id.groupId = pCxt->groupId;
  pSubplan->pNode = pNode;
  pNode->pParent = NULL;
X
Xiaoyu Wang 已提交
98
  splSetSubplanType(pSubplan);
99 100 101
  return pSubplan;
}

102
static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pChild, SExchangeLogicNode** pOutput) {
103
  SExchangeLogicNode* pExchange = (SExchangeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE);
X
Xiaoyu Wang 已提交
104 105 106
  if (NULL == pExchange) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
107

108 109
  pExchange->srcStartGroupId = pCxt->groupId;
  pExchange->srcEndGroupId = pCxt->groupId;
110 111
  pExchange->node.precision = pChild->precision;
  pExchange->node.pTargets = nodesCloneList(pChild->pTargets);
X
Xiaoyu Wang 已提交
112 113 114
  if (NULL == pExchange->node.pTargets) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
115 116 117 118 119
  if (NULL != pChild->pLimit) {
    pExchange->node.pLimit = nodesCloneNode(pChild->pLimit);
    if (NULL == pExchange->node.pLimit) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
120
    ((SLimitNode*)pChild->pLimit)->limit += ((SLimitNode*)pChild->pLimit)->offset;
121 122
    ((SLimitNode*)pChild->pLimit)->offset = 0;
  }
X
Xiaoyu Wang 已提交
123

124 125 126 127
  *pOutput = pExchange;
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
128 129 130 131 132
static int32_t splCreateExchangeNodeForSubplan(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode,
                                               ESubplanType subplanType) {
  SExchangeLogicNode* pExchange = NULL;
  int32_t             code = splCreateExchangeNode(pCxt, pSplitNode, &pExchange);
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
133
    code = replaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pExchange);
X
Xiaoyu Wang 已提交
134 135 136 137
  }
  if (TSDB_CODE_SUCCESS == code) {
    pSubplan->subplanType = subplanType;
  } else {
138
    nodesDestroyNode((SNode*)pExchange);
X
Xiaoyu Wang 已提交
139 140
  }
  return code;
X
Xiaoyu Wang 已提交
141 142
}

143 144
static bool splIsChildSubplan(SLogicNode* pLogicNode, int32_t groupId) {
  if (QUERY_NODE_LOGIC_PLAN_EXCHANGE == nodeType(pLogicNode)) {
145 146
    return groupId >= ((SExchangeLogicNode*)pLogicNode)->srcStartGroupId &&
           groupId <= ((SExchangeLogicNode*)pLogicNode)->srcEndGroupId;
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
  }

  if (QUERY_NODE_LOGIC_PLAN_MERGE == nodeType(pLogicNode)) {
    return ((SMergeLogicNode*)pLogicNode)->srcGroupId == groupId;
  }

  SNode* pChild;
  FOREACH(pChild, pLogicNode->pChildren) {
    bool isChild = splIsChildSubplan((SLogicNode*)pChild, groupId);
    if (isChild) {
      return isChild;
    }
  }
  return false;
}

163 164 165
static int32_t splMountSubplan(SLogicSubplan* pParent, SNodeList* pChildren) {
  SNode* pChild = NULL;
  WHERE_EACH(pChild, pChildren) {
166
    if (splIsChildSubplan(pParent->pNode, ((SLogicSubplan*)pChild)->id.groupId)) {
167 168 169 170 171 172 173 174 175 176 177 178 179 180
      int32_t code = nodesListMakeAppend(&pParent->pChildren, pChild);
      if (TSDB_CODE_SUCCESS == code) {
        REPLACE_NODE(NULL);
        ERASE_NODE(pChildren);
        continue;
      } else {
        return code;
      }
    }
    WHERE_NEXT;
  }
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
181 182 183 184 185 186 187 188 189 190 191 192 193 194
static bool splMatchByNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode, FSplFindSplitNode func,
                           void* pInfo) {
  if (func(pCxt, pSubplan, pNode, pInfo)) {
    return true;
  }
  SNode* pChild;
  FOREACH(pChild, pNode->pChildren) {
    if (splMatchByNode(pCxt, pSubplan, (SLogicNode*)pChild, func, pInfo)) {
      return true;
    }
  }
  return NULL;
}

195 196
static bool splMatch(SSplitContext* pCxt, SLogicSubplan* pSubplan, int32_t flag, FSplFindSplitNode func, void* pInfo) {
  if (!SPLIT_FLAG_TEST_MASK(pSubplan->splitFlag, flag)) {
X
Xiaoyu Wang 已提交
197
    if (splMatchByNode(pCxt, pSubplan, pSubplan->pNode, func, pInfo)) {
198 199 200 201 202 203 204 205 206 207 208 209
      return true;
    }
  }
  SNode* pChild;
  FOREACH(pChild, pSubplan->pChildren) {
    if (splMatch(pCxt, (SLogicSubplan*)pChild, flag, func, pInfo)) {
      return true;
    }
  }
  return false;
}

X
Xiaoyu Wang 已提交
210 211 212 213 214
static void splSetParent(SLogicNode* pNode) {
  SNode* pChild = NULL;
  FOREACH(pChild, pNode->pChildren) { ((SLogicNode*)pChild)->pParent = pNode; }
}

X
Xiaoyu Wang 已提交
215
typedef struct SStableSplitInfo {
X
Xiaoyu Wang 已提交
216 217
  SLogicNode*    pSplitNode;
  SLogicSubplan* pSubplan;
X
Xiaoyu Wang 已提交
218 219
} SStableSplitInfo;

X
Xiaoyu Wang 已提交
220 221 222
static bool stbSplHasGatherExecFunc(const SNodeList* pFuncs) {
  SNode* pFunc = NULL;
  FOREACH(pFunc, pFuncs) {
223 224
    if (!fmIsWindowPseudoColumnFunc(((SFunctionNode*)pFunc)->funcId) &&
        !fmIsDistExecFunc(((SFunctionNode*)pFunc)->funcId)) {
X
Xiaoyu Wang 已提交
225 226 227 228 229 230
      return true;
    }
  }
  return false;
}

231
static bool stbSplIsMultiTbScan(bool streamQuery, SScanLogicNode* pScan) {
5
54liuyao 已提交
232
  return (NULL != pScan->pVgroupList && pScan->pVgroupList->numOfVgroups > 1);
X
Xiaoyu Wang 已提交
233 234
}

235
static bool stbSplHasMultiTbScan(bool streamQuery, SLogicNode* pNode) {
X
Xiaoyu Wang 已提交
236 237 238 239
  if (1 != LIST_LENGTH(pNode->pChildren)) {
    return false;
  }
  SNode* pChild = nodesListGetNode(pNode->pChildren, 0);
X
Xiaoyu Wang 已提交
240 241 242 243 244 245
  if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pChild)) {
    if (1 != LIST_LENGTH(((SLogicNode*)pChild)->pChildren)) {
      return false;
    }
    pChild = nodesListGetNode(((SLogicNode*)pChild)->pChildren, 0);
  }
246
  return (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pChild));
X
Xiaoyu Wang 已提交
247 248
}

249 250 251 252 253 254 255 256
static bool stbSplIsMultiTbScanChild(bool streamQuery, SLogicNode* pNode) {
  if (1 != LIST_LENGTH(pNode->pChildren)) {
    return false;
  }
  SNode* pChild = nodesListGetNode(pNode->pChildren, 0);
  return (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pChild));
}

257 258 259 260 261 262 263 264 265 266 267 268 269
static bool stbSplNeedSplitWindow(bool streamQuery, SLogicNode* pNode) {
  SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode;
  if (WINDOW_TYPE_INTERVAL == pWindow->winType) {
    return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
  }

  if (WINDOW_TYPE_SESSION == pWindow->winType) {
    if (!streamQuery) {
      return stbSplHasMultiTbScan(streamQuery, pNode);
    } else {
      return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
    }
  }
X
Xiaoyu Wang 已提交
270

271 272 273 274 275 276 277 278 279 280 281
  if (WINDOW_TYPE_STATE == pWindow->winType) {
    if (!streamQuery) {
      return stbSplHasMultiTbScan(streamQuery, pNode);
    } else {
      return false;
    }
  }

  return false;
}

282 283 284 285 286 287 288 289 290 291 292 293 294
static bool stbSplNeedSplitJoin(bool streamQuery, SJoinLogicNode* pJoin) {
  if (pJoin->isSingleTableJoin) {
    return false;
  }
  SNode* pChild = NULL;
  FOREACH(pChild, pJoin->node.pChildren) {
    if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pChild) && QUERY_NODE_LOGIC_PLAN_JOIN != nodeType(pChild)) {
      return false;
    }
  }
  return true;
}

295 296 297 298 299 300 301 302 303 304 305 306 307 308
static bool stbSplIsTableCountQuery(SLogicNode* pNode) {
  if (1 != LIST_LENGTH(pNode->pChildren)) {
    return false;
  }
  SNode* pChild = nodesListGetNode(pNode->pChildren, 0);
  if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pChild)) {
    if (1 != LIST_LENGTH(((SLogicNode*)pChild)->pChildren)) {
      return false;
    }
    pChild = nodesListGetNode(((SLogicNode*)pChild)->pChildren, 0);
  }
  return QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && SCAN_TYPE_TABLE_COUNT == ((SScanLogicNode*)pChild)->scanType;
}

309
static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
X
Xiaoyu Wang 已提交
310
  switch (nodeType(pNode)) {
X
Xiaoyu Wang 已提交
311
    case QUERY_NODE_LOGIC_PLAN_SCAN:
312
      return streamQuery ? false : stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode);
X
Xiaoyu Wang 已提交
313
    case QUERY_NODE_LOGIC_PLAN_JOIN:
314
      return stbSplNeedSplitJoin(streamQuery, (SJoinLogicNode*)pNode);
315
    case QUERY_NODE_LOGIC_PLAN_PARTITION:
X
Xiaoyu Wang 已提交
316
      return streamQuery ? false : stbSplIsMultiTbScanChild(streamQuery, pNode);
X
Xiaoyu Wang 已提交
317
    case QUERY_NODE_LOGIC_PLAN_AGG:
318 319
      return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode) &&
             !stbSplIsTableCountQuery(pNode);
320 321
    case QUERY_NODE_LOGIC_PLAN_WINDOW:
      return stbSplNeedSplitWindow(streamQuery, pNode);
322 323
    case QUERY_NODE_LOGIC_PLAN_SORT:
      return stbSplHasMultiTbScan(streamQuery, pNode);
X
Xiaoyu Wang 已提交
324 325 326 327 328 329
    default:
      break;
  }
  return false;
}

X
Xiaoyu Wang 已提交
330 331 332 333
static bool stbSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
                                SStableSplitInfo* pInfo) {
  if (stbSplNeedSplit(pCxt->pPlanCxt->streamQuery, pNode)) {
    pInfo->pSplitNode = pNode;
334
    pInfo->pSubplan = pSubplan;
X
Xiaoyu Wang 已提交
335
    return true;
336
  }
X
Xiaoyu Wang 已提交
337
  return false;
338 339
}

X
Xiaoyu Wang 已提交
340 341 342 343 344 345 346 347
static int32_t stbSplRewriteFuns(const SNodeList* pFuncs, SNodeList** pPartialFuncs, SNodeList** pMergeFuncs) {
  SNode* pNode = NULL;
  FOREACH(pNode, pFuncs) {
    SFunctionNode* pFunc = (SFunctionNode*)pNode;
    SFunctionNode* pPartFunc = NULL;
    SFunctionNode* pMergeFunc = NULL;
    int32_t        code = TSDB_CODE_SUCCESS;
    if (fmIsWindowPseudoColumnFunc(pFunc->funcId)) {
348 349
      pPartFunc = (SFunctionNode*)nodesCloneNode(pNode);
      pMergeFunc = (SFunctionNode*)nodesCloneNode(pNode);
X
Xiaoyu Wang 已提交
350
      if (NULL == pPartFunc || NULL == pMergeFunc) {
351 352
        nodesDestroyNode((SNode*)pPartFunc);
        nodesDestroyNode((SNode*)pMergeFunc);
X
Xiaoyu Wang 已提交
353 354 355 356 357 358
        code = TSDB_CODE_OUT_OF_MEMORY;
      }
    } else {
      code = fmGetDistMethod(pFunc, &pPartFunc, &pMergeFunc);
    }
    if (TSDB_CODE_SUCCESS == code) {
359
      code = nodesListMakeStrictAppend(pPartialFuncs, (SNode*)pPartFunc);
X
Xiaoyu Wang 已提交
360 361
    }
    if (TSDB_CODE_SUCCESS == code) {
362
      code = nodesListMakeStrictAppend(pMergeFuncs, (SNode*)pMergeFunc);
X
Xiaoyu Wang 已提交
363 364 365 366 367 368 369 370 371 372 373 374 375 376
    }
    if (TSDB_CODE_SUCCESS != code) {
      nodesDestroyList(*pPartialFuncs);
      nodesDestroyList(*pMergeFuncs);
      return code;
    }
  }
  return TSDB_CODE_SUCCESS;
}

static int32_t stbSplAppendWStart(SNodeList* pFuncs, int32_t* pIndex) {
  int32_t index = 0;
  SNode*  pFunc = NULL;
  FOREACH(pFunc, pFuncs) {
377
    if (FUNCTION_TYPE_WSTART == ((SFunctionNode*)pFunc)->funcType) {
X
Xiaoyu Wang 已提交
378 379 380 381 382 383
      *pIndex = index;
      return TSDB_CODE_SUCCESS;
    }
    ++index;
  }

384
  SFunctionNode* pWStart = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
X
Xiaoyu Wang 已提交
385 386 387
  if (NULL == pWStart) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
388
  strcpy(pWStart->functionName, "_wstart");
X
Xiaoyu Wang 已提交
389 390
  int64_t pointer = (int64_t)pWStart;
  snprintf(pWStart->node.aliasName, sizeof(pWStart->node.aliasName), "%s.%" PRId64 "", pWStart->functionName, pointer);
X
Xiaoyu Wang 已提交
391 392
  int32_t code = fmGetFuncInfo(pWStart, NULL, 0);
  if (TSDB_CODE_SUCCESS == code) {
393
    code = nodesListStrictAppend(pFuncs, (SNode*)pWStart);
X
Xiaoyu Wang 已提交
394 395 396 397 398
  }
  *pIndex = index;
  return code;
}

5
54liuyao 已提交
399 400 401 402
static int32_t stbSplAppendWEnd(SWindowLogicNode* pWin, int32_t* pIndex) {
  int32_t index = 0;
  SNode*  pFunc = NULL;
  FOREACH(pFunc, pWin->pFuncs) {
403
    if (FUNCTION_TYPE_WEND == ((SFunctionNode*)pFunc)->funcType) {
5
54liuyao 已提交
404 405 406 407 408 409 410 411 412 413
      *pIndex = index;
      return TSDB_CODE_SUCCESS;
    }
    ++index;
  }

  SFunctionNode* pWEnd = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
  if (NULL == pWEnd) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
414
  strcpy(pWEnd->functionName, "_wend");
X
Xiaoyu Wang 已提交
415 416
  int64_t pointer = (int64_t)pWEnd;
  snprintf(pWEnd->node.aliasName, sizeof(pWEnd->node.aliasName), "%s.%" PRId64 "", pWEnd->functionName, pointer);
5
54liuyao 已提交
417 418 419 420 421 422 423 424 425 426 427
  int32_t code = fmGetFuncInfo(pWEnd, NULL, 0);
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListStrictAppend(pWin->pFuncs, (SNode*)pWEnd);
  }
  *pIndex = index;
  if (TSDB_CODE_SUCCESS == code) {
    code = createColumnByRewriteExpr(nodesListGetNode(pWin->pFuncs, index), &pWin->node.pTargets);
  }
  return code;
}

X
Xiaoyu Wang 已提交
428 429 430 431 432 433 434
static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogicNode** pPartWindow) {
  SNodeList* pFunc = pMergeWindow->pFuncs;
  pMergeWindow->pFuncs = NULL;
  SNodeList* pTargets = pMergeWindow->node.pTargets;
  pMergeWindow->node.pTargets = NULL;
  SNodeList* pChildren = pMergeWindow->node.pChildren;
  pMergeWindow->node.pChildren = NULL;
X
Xiaoyu Wang 已提交
435 436
  SNode* pConditions = pMergeWindow->node.pConditions;
  pMergeWindow->node.pConditions = NULL;
X
Xiaoyu Wang 已提交
437

438
  SWindowLogicNode* pPartWin = (SWindowLogicNode*)nodesCloneNode((SNode*)pMergeWindow);
X
Xiaoyu Wang 已提交
439
  if (NULL == pPartWin) {
X
Xiaoyu Wang 已提交
440
    return TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
441 442
  }

443
  pPartWin->node.groupAction = GROUP_ACTION_KEEP;
X
Xiaoyu Wang 已提交
444 445 446 447
  pMergeWindow->node.pTargets = pTargets;
  pMergeWindow->node.pConditions = pConditions;
  pPartWin->node.pChildren = pChildren;
  splSetParent((SLogicNode*)pPartWin);
448

X
Xiaoyu Wang 已提交
449
  int32_t index = 0;
X
Xiaoyu Wang 已提交
450
  int32_t code = stbSplRewriteFuns(pFunc, &pPartWin->pFuncs, &pMergeWindow->pFuncs);
X
Xiaoyu Wang 已提交
451 452 453 454
  if (TSDB_CODE_SUCCESS == code) {
    code = stbSplAppendWStart(pPartWin->pFuncs, &index);
  }
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
455
    code = createColumnByRewriteExprs(pPartWin->pFuncs, &pPartWin->node.pTargets);
X
Xiaoyu Wang 已提交
456 457 458 459 460 461 462 463 464 465 466 467 468
  }
  if (TSDB_CODE_SUCCESS == code) {
    nodesDestroyNode(pMergeWindow->pTspk);
    pMergeWindow->pTspk = nodesCloneNode(nodesListGetNode(pPartWin->node.pTargets, index));
    if (NULL == pMergeWindow->pTspk) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  }

  nodesDestroyList(pFunc);
  if (TSDB_CODE_SUCCESS == code) {
    *pPartWindow = (SLogicNode*)pPartWin;
  } else {
469
    nodesDestroyNode((SNode*)pPartWin);
X
Xiaoyu Wang 已提交
470 471 472 473 474
  }

  return code;
}

X
Xiaoyu Wang 已提交
475 476 477 478 479 480 481 482 483 484 485
static int32_t stbSplGetNumOfVgroups(SLogicNode* pNode) {
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
    return ((SScanLogicNode*)pNode)->pVgroupList->numOfVgroups;
  } else {
    if (1 == LIST_LENGTH(pNode->pChildren)) {
      return stbSplGetNumOfVgroups((SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
    }
  }
  return 0;
}

X
Xiaoyu Wang 已提交
486
static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode,
487
                                     SNodeList* pMergeKeys, SLogicNode* pPartChild, bool groupSort) {
488
  SMergeLogicNode* pMerge = (SMergeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE);
X
Xiaoyu Wang 已提交
489 490 491
  if (NULL == pMerge) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
492
  pMerge->numOfChannels = stbSplGetNumOfVgroups(pPartChild);
X
Xiaoyu Wang 已提交
493 494
  pMerge->srcGroupId = pCxt->groupId;
  pMerge->node.precision = pPartChild->precision;
X
Xiaoyu Wang 已提交
495
  pMerge->pMergeKeys = pMergeKeys;
496
  pMerge->groupSort = groupSort;
X
Xiaoyu Wang 已提交
497 498 499

  int32_t code = TSDB_CODE_SUCCESS;
  pMerge->pInputs = nodesCloneList(pPartChild->pTargets);
X
Xiaoyu Wang 已提交
500
  // NULL != pSubplan means 'merge node' replaces 'split node'.
501 502 503 504 505
  if (NULL == pSubplan) {
    pMerge->node.pTargets = nodesCloneList(pPartChild->pTargets);
  } else {
    pMerge->node.pTargets = nodesCloneList(pSplitNode->pTargets);
  }
X
Xiaoyu Wang 已提交
506 507 508
  if (NULL == pMerge->node.pTargets || NULL == pMerge->pInputs) {
    code = TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
509 510 511 512 513 514
  if (TSDB_CODE_SUCCESS == code && NULL != pSplitNode->pLimit) {
    pMerge->node.pLimit = nodesCloneNode(pSplitNode->pLimit);
    if (NULL == pMerge->node.pLimit) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  }
X
Xiaoyu Wang 已提交
515 516
  if (TSDB_CODE_SUCCESS == code) {
    if (NULL == pSubplan) {
517
      code = nodesListMakeAppend(&pSplitNode->pChildren, (SNode*)pMerge);
X
Xiaoyu Wang 已提交
518
    } else {
X
Xiaoyu Wang 已提交
519
      code = replaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pMerge);
X
Xiaoyu Wang 已提交
520 521 522
    }
  }
  if (TSDB_CODE_SUCCESS != code) {
523
    nodesDestroyNode((SNode*)pMerge);
X
Xiaoyu Wang 已提交
524
  }
X
Xiaoyu Wang 已提交
525
  return code;
X
Xiaoyu Wang 已提交
526 527
}

X
Xiaoyu Wang 已提交
528 529 530 531
static int32_t stbSplCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pParent, SLogicNode* pPartChild) {
  SExchangeLogicNode* pExchange = NULL;
  int32_t             code = splCreateExchangeNode(pCxt, pPartChild, &pExchange);
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
532
    pExchange->node.pParent = pParent;
533
    code = nodesListMakeAppend(&pParent->pChildren, (SNode*)pExchange);
X
Xiaoyu Wang 已提交
534 535 536 537
  }
  return code;
}

538
static int32_t stbSplCreateMergeKeysByPrimaryKey(SNode* pPrimaryKey, EOrder order, SNodeList** pMergeKeys) {
539
  SOrderByExprNode* pMergeKey = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
540 541 542
  if (NULL == pMergeKey) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
543
  pMergeKey->pExpr = nodesCloneNode(pPrimaryKey);
544
  if (NULL == pMergeKey->pExpr) {
545
    nodesDestroyNode((SNode*)pMergeKey);
546 547
    return TSDB_CODE_OUT_OF_MEMORY;
  }
548
  pMergeKey->order = order;
549
  pMergeKey->nullOrder = NULL_ORDER_FIRST;
550
  return nodesListMakeStrictAppend(pMergeKeys, (SNode*)pMergeKey);
551 552
}

553
static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
X
Xiaoyu Wang 已提交
554 555 556
  SLogicNode* pPartWindow = NULL;
  int32_t     code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
  if (TSDB_CODE_SUCCESS == code) {
557 558
    ((SWindowLogicNode*)pPartWindow)->windowAlgo = INTERVAL_ALGO_HASH;
    ((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = INTERVAL_ALGO_MERGE;
X
Xiaoyu Wang 已提交
559
    SNodeList* pMergeKeys = NULL;
560
    code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk,
X
Xiaoyu Wang 已提交
561
                                             ((SWindowLogicNode*)pInfo->pSplitNode)->outputTsOrder, &pMergeKeys);
X
Xiaoyu Wang 已提交
562
    if (TSDB_CODE_SUCCESS == code) {
563
      code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow, true);
X
Xiaoyu Wang 已提交
564 565 566 567
    }
    if (TSDB_CODE_SUCCESS != code) {
      nodesDestroyList(pMergeKeys);
    }
X
Xiaoyu Wang 已提交
568 569 570
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
571
                                     (SNode*)splCreateScanSubplan(pCxt, pPartWindow, SPLIT_FLAG_STABLE_SPLIT));
X
Xiaoyu Wang 已提交
572 573
  }
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
574
  ++(pCxt->groupId);
X
Xiaoyu Wang 已提交
575 576 577
  return code;
}

578
static int32_t stbSplSplitIntervalForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
579 580 581
  SLogicNode* pPartWindow = NULL;
  int32_t     code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
  if (TSDB_CODE_SUCCESS == code) {
582 583
    ((SWindowLogicNode*)pPartWindow)->windowAlgo = INTERVAL_ALGO_STREAM_SEMI;
    ((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = INTERVAL_ALGO_STREAM_FINAL;
584 585 586 587
    code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartWindow);
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
588
                                     (SNode*)splCreateScanSubplan(pCxt, pPartWindow, SPLIT_FLAG_STABLE_SPLIT));
589 590
  }
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
591
  ++(pCxt->groupId);
592 593 594
  return code;
}

595
static int32_t stbSplSplitInterval(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
596
  if (pCxt->pPlanCxt->streamQuery) {
597
    return stbSplSplitIntervalForStream(pCxt, pInfo);
598
  } else {
599
    return stbSplSplitIntervalForBatch(pCxt, pInfo);
600 601 602
  }
}

603 604 605 606
static int32_t stbSplSplitSessionForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
  SLogicNode* pPartWindow = NULL;
  int32_t     code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
  if (TSDB_CODE_SUCCESS == code) {
5
54liuyao 已提交
607 608 609 610 611 612 613
    SWindowLogicNode* pPartWin = (SWindowLogicNode*)pPartWindow;
    SWindowLogicNode* pMergeWin = (SWindowLogicNode*)pInfo->pSplitNode;
    pPartWin->windowAlgo = SESSION_ALGO_STREAM_SEMI;
    pMergeWin->windowAlgo = SESSION_ALGO_STREAM_FINAL;
    int32_t index = 0;
    int32_t code = stbSplAppendWEnd(pPartWin, &index);
    if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
614
      nodesDestroyNode(pMergeWin->pTsEnd);
5
54liuyao 已提交
615 616 617 618 619
      pMergeWin->pTsEnd = nodesCloneNode(nodesListGetNode(pPartWin->node.pTargets, index));
      if (NULL == pMergeWin->pTsEnd) {
        code = TSDB_CODE_OUT_OF_MEMORY;
      }
    }
620 621 622 623 624 625 626 627 628 629 630
    code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartWindow);
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
                                     (SNode*)splCreateScanSubplan(pCxt, pPartWindow, SPLIT_FLAG_STABLE_SPLIT));
  }
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
  ++(pCxt->groupId);
  return code;
}

631
static void stbSplSetTableMergeScan(SLogicNode* pNode) {
632
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
633 634 635 636 637
    SScanLogicNode* pScan = (SScanLogicNode*)pNode;
    pScan->scanType = SCAN_TYPE_TABLE_MERGE;
    if (NULL != pScan->pGroupTags) {
      pScan->groupSort = true;
    }
638 639
  } else {
    if (1 == LIST_LENGTH(pNode->pChildren)) {
640
      stbSplSetTableMergeScan((SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
641 642 643 644 645 646 647 648 649
    }
  }
}

static int32_t stbSplSplitSessionOrStateForBatch(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
  SLogicNode* pWindow = pInfo->pSplitNode;
  SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pWindow->pChildren, 0);

  SNodeList* pMergeKeys = NULL;
650 651
  int32_t    code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pWindow)->pTspk,
                                                      ((SWindowLogicNode*)pWindow)->inputTsOrder, &pMergeKeys);
652 653

  if (TSDB_CODE_SUCCESS == code) {
654
    code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pChild, pMergeKeys, (SLogicNode*)pChild, true);
655 656 657 658 659 660 661 662
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
                                     (SNode*)splCreateScanSubplan(pCxt, pChild, SPLIT_FLAG_STABLE_SPLIT));
  }

  if (TSDB_CODE_SUCCESS == code) {
663
    stbSplSetTableMergeScan(pChild);
664 665
    pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
    SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT);
666
    ++(pCxt->groupId);
667 668 669 670 671 672 673
  } else {
    nodesDestroyList(pMergeKeys);
  }

  return code;
}

674
static int32_t stbSplSplitSession(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
675 676 677
  if (pCxt->pPlanCxt->streamQuery) {
    return stbSplSplitSessionForStream(pCxt, pInfo);
  } else {
678 679 680 681 682 683 684 685 686 687 688 689 690
    return stbSplSplitSessionOrStateForBatch(pCxt, pInfo);
  }
}

static int32_t stbSplSplitStateForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
  return TSDB_CODE_PLAN_INTERNAL_ERROR;
}

static int32_t stbSplSplitState(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
  if (pCxt->pPlanCxt->streamQuery) {
    return stbSplSplitStateForStream(pCxt, pInfo);
  } else {
    return stbSplSplitSessionOrStateForBatch(pCxt, pInfo);
691
  }
692 693
}

X
Xiaoyu Wang 已提交
694 695
static SNodeList* stbSplGetPartKeys(SLogicNode* pNode) {
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
696
    return ((SScanLogicNode*)pNode)->pGroupTags;
697 698
  } else if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
    return ((SPartitionLogicNode*)pNode)->pPartitionKeys;
X
Xiaoyu Wang 已提交
699 700 701 702 703 704 705 706 707 708
  } else {
    return NULL;
  }
}

static bool stbSplIsPartTbanme(SNodeList* pPartKeys) {
  if (NULL == pPartKeys || 1 != LIST_LENGTH(pPartKeys)) {
    return false;
  }
  SNode* pPartKey = nodesListGetNode(pPartKeys, 0);
709 710
  return (QUERY_NODE_FUNCTION == nodeType(pPartKey) && FUNCTION_TYPE_TBNAME == ((SFunctionNode*)pPartKey)->funcType) ||
         (QUERY_NODE_COLUMN == nodeType(pPartKey) && COLUMN_TYPE_TBNAME == ((SColumnNode*)pPartKey)->colType);
X
Xiaoyu Wang 已提交
711 712
}

713
static bool stbSplIsPartTableWinodw(SWindowLogicNode* pWindow) {
X
Xiaoyu Wang 已提交
714 715 716
  return stbSplIsPartTbanme(stbSplGetPartKeys((SLogicNode*)nodesListGetNode(pWindow->node.pChildren, 0)));
}

717
static int32_t stbSplSplitWindowForCrossTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
718 719 720 721 722
  switch (((SWindowLogicNode*)pInfo->pSplitNode)->winType) {
    case WINDOW_TYPE_INTERVAL:
      return stbSplSplitInterval(pCxt, pInfo);
    case WINDOW_TYPE_SESSION:
      return stbSplSplitSession(pCxt, pInfo);
723 724
    case WINDOW_TYPE_STATE:
      return stbSplSplitState(pCxt, pInfo);
725 726 727 728 729 730
    default:
      break;
  }
  return TSDB_CODE_PLAN_INTERNAL_ERROR;
}

731
static int32_t stbSplSplitWindowForPartTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
X
Xiaoyu Wang 已提交
732 733 734 735 736
  if (pCxt->pPlanCxt->streamQuery) {
    SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT);
    return TSDB_CODE_SUCCESS;
  }

737 738 739
  if (NULL != pInfo->pSplitNode->pParent && QUERY_NODE_LOGIC_PLAN_FILL == nodeType(pInfo->pSplitNode->pParent)) {
    pInfo->pSplitNode = pInfo->pSplitNode->pParent;
  }
X
Xiaoyu Wang 已提交
740 741 742 743 744 745 746 747 748 749 750 751 752 753 754
  SExchangeLogicNode* pExchange = NULL;
  int32_t             code = splCreateExchangeNode(pCxt, pInfo->pSplitNode, &pExchange);
  if (TSDB_CODE_SUCCESS == code) {
    code = replaceLogicNode(pInfo->pSubplan, pInfo->pSplitNode, (SLogicNode*)pExchange);
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
                                     (SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
  }
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
  ++(pCxt->groupId);
  return code;
}

static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
755 756
  if (stbSplIsPartTableWinodw((SWindowLogicNode*)pInfo->pSplitNode)) {
    return stbSplSplitWindowForPartTable(pCxt, pInfo);
X
Xiaoyu Wang 已提交
757
  } else {
758
    return stbSplSplitWindowForCrossTable(pCxt, pInfo);
X
Xiaoyu Wang 已提交
759 760 761
  }
}

X
Xiaoyu Wang 已提交
762 763 764 765 766 767 768 769 770
static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pOutput) {
  SNodeList* pFunc = pMergeAgg->pAggFuncs;
  pMergeAgg->pAggFuncs = NULL;
  SNodeList* pGroupKeys = pMergeAgg->pGroupKeys;
  pMergeAgg->pGroupKeys = NULL;
  SNodeList* pTargets = pMergeAgg->node.pTargets;
  pMergeAgg->node.pTargets = NULL;
  SNodeList* pChildren = pMergeAgg->node.pChildren;
  pMergeAgg->node.pChildren = NULL;
X
Xiaoyu Wang 已提交
771 772
  SNode* pConditions = pMergeAgg->node.pConditions;
  pMergeAgg->node.pConditions = NULL;
X
Xiaoyu Wang 已提交
773

774
  SAggLogicNode* pPartAgg = (SAggLogicNode*)nodesCloneNode((SNode*)pMergeAgg);
X
Xiaoyu Wang 已提交
775
  if (NULL == pPartAgg) {
X
Xiaoyu Wang 已提交
776
    return TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
777 778
  }

779 780
  pPartAgg->node.groupAction = GROUP_ACTION_KEEP;

X
Xiaoyu Wang 已提交
781 782 783
  int32_t code = TSDB_CODE_SUCCESS;

  if (NULL != pGroupKeys) {
X
Xiaoyu Wang 已提交
784
    pPartAgg->pGroupKeys = pGroupKeys;
X
Xiaoyu Wang 已提交
785
    code = createColumnByRewriteExprs(pPartAgg->pGroupKeys, &pPartAgg->node.pTargets);
X
Xiaoyu Wang 已提交
786 787 788 789 790 791 792 793
  }
  if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) {
    pMergeAgg->pGroupKeys = nodesCloneList(pPartAgg->node.pTargets);
    if (NULL == pMergeAgg->pGroupKeys) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  }
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
794
    pMergeAgg->node.pConditions = pConditions;
X
Xiaoyu Wang 已提交
795 796
    pMergeAgg->node.pTargets = pTargets;
    pPartAgg->node.pChildren = pChildren;
X
Xiaoyu Wang 已提交
797
    splSetParent((SLogicNode*)pPartAgg);
X
Xiaoyu Wang 已提交
798 799 800 801

    code = stbSplRewriteFuns(pFunc, &pPartAgg->pAggFuncs, &pMergeAgg->pAggFuncs);
  }
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
802
    code = createColumnByRewriteExprs(pPartAgg->pAggFuncs, &pPartAgg->node.pTargets);
X
Xiaoyu Wang 已提交
803 804 805 806 807 808
  }

  nodesDestroyList(pFunc);
  if (TSDB_CODE_SUCCESS == code) {
    *pOutput = (SLogicNode*)pPartAgg;
  } else {
809
    nodesDestroyNode((SNode*)pPartAgg);
X
Xiaoyu Wang 已提交
810 811 812 813 814 815 816 817 818 819 820 821 822
  }

  return code;
}

static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
  SLogicNode* pPartAgg = NULL;
  int32_t     code = stbSplCreatePartAggNode((SAggLogicNode*)pInfo->pSplitNode, &pPartAgg);
  if (TSDB_CODE_SUCCESS == code) {
    code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartAgg);
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
823
                                     (SNode*)splCreateScanSubplan(pCxt, pPartAgg, SPLIT_FLAG_STABLE_SPLIT));
X
Xiaoyu Wang 已提交
824 825
  }
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
826
  ++(pCxt->groupId);
X
Xiaoyu Wang 已提交
827 828 829
  return code;
}

X
Xiaoyu Wang 已提交
830
static SNode* stbSplCreateColumnNode(SExprNode* pExpr) {
831
  SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
X
Xiaoyu Wang 已提交
832 833 834 835
  if (NULL == pCol) {
    return NULL;
  }
  if (QUERY_NODE_COLUMN == nodeType(pExpr)) {
X
Xiaoyu Wang 已提交
836 837
    strcpy(pCol->dbName, ((SColumnNode*)pExpr)->dbName);
    strcpy(pCol->tableName, ((SColumnNode*)pExpr)->tableName);
X
Xiaoyu Wang 已提交
838
    strcpy(pCol->tableAlias, ((SColumnNode*)pExpr)->tableAlias);
839 840 841
    strcpy(pCol->colName, ((SColumnNode*)pExpr)->colName);
  } else {
    strcpy(pCol->colName, pExpr->aliasName);
X
Xiaoyu Wang 已提交
842 843 844 845 846 847 848
  }
  strcpy(pCol->node.aliasName, pExpr->aliasName);
  pCol->node.resType = pExpr->resType;
  return (SNode*)pCol;
}

static SNode* stbSplCreateOrderByExpr(SOrderByExprNode* pSortKey, SNode* pCol) {
849
  SOrderByExprNode* pOutput = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
X
Xiaoyu Wang 已提交
850 851 852 853 854
  if (NULL == pOutput) {
    return NULL;
  }
  pOutput->pExpr = nodesCloneNode(pCol);
  if (NULL == pOutput->pExpr) {
855
    nodesDestroyNode((SNode*)pOutput);
X
Xiaoyu Wang 已提交
856 857 858 859 860 861 862 863 864 865 866 867 868
    return NULL;
  }
  pOutput->order = pSortKey->order;
  pOutput->nullOrder = pSortKey->nullOrder;
  return (SNode*)pOutput;
}

static int32_t stbSplCreateMergeKeys(SNodeList* pSortKeys, SNodeList* pTargets, SNodeList** pOutput) {
  int32_t    code = TSDB_CODE_SUCCESS;
  SNodeList* pMergeKeys = NULL;
  SNode*     pNode = NULL;
  FOREACH(pNode, pSortKeys) {
    SOrderByExprNode* pSortKey = (SOrderByExprNode*)pNode;
869
    SExprNode*        pSortExpr = (SExprNode*)pSortKey->pExpr;
X
Xiaoyu Wang 已提交
870 871 872
    SNode*            pTarget = NULL;
    bool              found = false;
    FOREACH(pTarget, pTargets) {
873 874
      if ((QUERY_NODE_COLUMN == nodeType(pSortExpr) && nodesEqualNode((SNode*)pSortExpr, pTarget)) ||
          (0 == strcmp(pSortExpr->aliasName, ((SColumnNode*)pTarget)->colName))) {
X
Xiaoyu Wang 已提交
875 876 877 878 879 880 881 882
        code = nodesListMakeStrictAppend(&pMergeKeys, stbSplCreateOrderByExpr(pSortKey, pTarget));
        if (TSDB_CODE_SUCCESS != code) {
          break;
        }
        found = true;
      }
    }
    if (TSDB_CODE_SUCCESS == code && !found) {
883
      SNode* pCol = stbSplCreateColumnNode(pSortExpr);
X
Xiaoyu Wang 已提交
884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908
      code = nodesListMakeStrictAppend(&pMergeKeys, stbSplCreateOrderByExpr(pSortKey, pCol));
      if (TSDB_CODE_SUCCESS == code) {
        code = nodesListStrictAppend(pTargets, pCol);
      } else {
        nodesDestroyNode(pCol);
      }
    }
    if (TSDB_CODE_SUCCESS != code) {
      break;
    }
  }
  if (TSDB_CODE_SUCCESS == code) {
    *pOutput = pMergeKeys;
  } else {
    nodesDestroyList(pMergeKeys);
  }
  return code;
}

static int32_t stbSplCreatePartSortNode(SSortLogicNode* pSort, SLogicNode** pOutputPartSort,
                                        SNodeList** pOutputMergeKeys) {
  SNodeList* pSortKeys = pSort->pSortKeys;
  pSort->pSortKeys = NULL;
  SNodeList* pChildren = pSort->node.pChildren;
  pSort->node.pChildren = NULL;
X
Xiaoyu Wang 已提交
909 910

  int32_t         code = TSDB_CODE_SUCCESS;
911
  SSortLogicNode* pPartSort = (SSortLogicNode*)nodesCloneNode((SNode*)pSort);
X
Xiaoyu Wang 已提交
912 913 914 915
  if (NULL == pPartSort) {
    code = TSDB_CODE_OUT_OF_MEMORY;
  }

X
Xiaoyu Wang 已提交
916
  SNodeList* pMergeKeys = NULL;
X
Xiaoyu Wang 已提交
917
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
918
    pPartSort->node.pChildren = pChildren;
X
Xiaoyu Wang 已提交
919
    splSetParent((SLogicNode*)pPartSort);
X
Xiaoyu Wang 已提交
920
    pPartSort->pSortKeys = pSortKeys;
921
    pPartSort->groupSort = pSort->groupSort;
X
Xiaoyu Wang 已提交
922
    code = stbSplCreateMergeKeys(pPartSort->pSortKeys, pPartSort->node.pTargets, &pMergeKeys);
X
Xiaoyu Wang 已提交
923 924 925
  }

  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
926 927
    *pOutputPartSort = (SLogicNode*)pPartSort;
    *pOutputMergeKeys = pMergeKeys;
X
Xiaoyu Wang 已提交
928
  } else {
929
    nodesDestroyNode((SNode*)pPartSort);
X
Xiaoyu Wang 已提交
930
    nodesDestroyList(pMergeKeys);
X
Xiaoyu Wang 已提交
931 932 933 934 935
  }

  return code;
}

936 937 938 939 940 941 942 943 944 945 946 947 948
static void stbSplSetScanPartSort(SLogicNode* pNode) {
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
    SScanLogicNode* pScan = (SScanLogicNode*)pNode;
    if (NULL != pScan->pGroupTags) {
      pScan->groupSort = true;
    }
  } else {
    if (1 == LIST_LENGTH(pNode->pChildren)) {
      stbSplSetScanPartSort((SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
    }
  }
}

X
Xiaoyu Wang 已提交
949 950
static int32_t stbSplSplitSortNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
  SLogicNode* pPartSort = NULL;
X
Xiaoyu Wang 已提交
951
  SNodeList*  pMergeKeys = NULL;
952
  bool        groupSort = ((SSortLogicNode*)pInfo->pSplitNode)->groupSort;
X
Xiaoyu Wang 已提交
953
  int32_t     code = stbSplCreatePartSortNode((SSortLogicNode*)pInfo->pSplitNode, &pPartSort, &pMergeKeys);
X
Xiaoyu Wang 已提交
954
  if (TSDB_CODE_SUCCESS == code) {
955 956
    code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, pMergeKeys, pPartSort, groupSort);
  }
X
Xiaoyu Wang 已提交
957
  if (TSDB_CODE_SUCCESS == code) {
958 959 960 961
    nodesDestroyNode((SNode*)pInfo->pSplitNode);
    if (groupSort) {
      stbSplSetScanPartSort(pPartSort);
    }
X
Xiaoyu Wang 已提交
962
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
963
                                     (SNode*)splCreateScanSubplan(pCxt, pPartSort, SPLIT_FLAG_STABLE_SPLIT));
X
Xiaoyu Wang 已提交
964 965
  }
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
966
  ++(pCxt->groupId);
X
Xiaoyu Wang 已提交
967 968 969
  return code;
}

970
static int32_t stbSplSplitScanNodeWithoutPartTags(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
971
  SLogicNode* pSplitNode = pInfo->pSplitNode;
972 973
  if (NULL != pInfo->pSplitNode->pParent && QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pInfo->pSplitNode->pParent) &&
      NULL == pInfo->pSplitNode->pParent->pLimit && NULL == pInfo->pSplitNode->pParent->pSlimit) {
974
    pSplitNode = pInfo->pSplitNode->pParent;
975 976 977 978 979
    if (NULL != pInfo->pSplitNode->pLimit) {
      pSplitNode->pLimit = nodesCloneNode(pInfo->pSplitNode->pLimit);
      if (NULL == pSplitNode->pLimit) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
X
Xiaoyu Wang 已提交
980
      ((SLimitNode*)pInfo->pSplitNode->pLimit)->limit += ((SLimitNode*)pInfo->pSplitNode->pLimit)->offset;
981 982
      ((SLimitNode*)pInfo->pSplitNode->pLimit)->offset = 0;
    }
983 984
  }
  int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pSplitNode, SUBPLAN_TYPE_MERGE);
X
Xiaoyu Wang 已提交
985 986
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
987
                                     (SNode*)splCreateScanSubplan(pCxt, pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
X
Xiaoyu Wang 已提交
988
  }
989
  ++(pCxt->groupId);
X
Xiaoyu Wang 已提交
990 991 992
  return code;
}

993
static int32_t stbSplSplitScanNodeWithPartTags(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
994
  SLogicNode* pSplitNode = pInfo->pSplitNode;
995 996
  if (NULL != pInfo->pSplitNode->pParent && QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pInfo->pSplitNode->pParent) &&
      NULL == pInfo->pSplitNode->pParent->pLimit && NULL == pInfo->pSplitNode->pParent->pSlimit) {
997 998 999
    pSplitNode = pInfo->pSplitNode->pParent;
  }
  int32_t code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pSplitNode, NULL, pSplitNode, true);
1000 1001
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
1002
                                     (SNode*)splCreateScanSubplan(pCxt, pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
1003 1004 1005 1006 1007 1008
  }
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
  ++(pCxt->groupId);
  return code;
}

X
Xiaoyu Wang 已提交
1009
static SNode* stbSplFindPrimaryKeyFromScan(SScanLogicNode* pScan) {
1010
  bool   find = false;
X
Xiaoyu Wang 已提交
1011 1012 1013
  SNode* pCol = NULL;
  FOREACH(pCol, pScan->pScanCols) {
    if (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pCol)->colId) {
1014 1015 1016 1017 1018 1019 1020 1021 1022 1023
      find = true;
      break;
    }
  }
  if (!find) {
    return NULL;
  }
  SNode* pTarget = NULL;
  FOREACH(pTarget, pScan->node.pTargets) {
    if (nodesEqualNode(pTarget, pCol)) {
X
Xiaoyu Wang 已提交
1024 1025 1026
      return pCol;
    }
  }
1027 1028
  nodesListStrictAppend(pScan->node.pTargets, nodesCloneNode(pCol));
  return pCol;
X
Xiaoyu Wang 已提交
1029 1030
}

1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046
static int32_t stbSplCreateMergeScanNode(SScanLogicNode* pScan, SLogicNode** pOutputMergeScan,
                                         SNodeList** pOutputMergeKeys) {
  SNodeList* pChildren = pScan->node.pChildren;
  pScan->node.pChildren = NULL;

  int32_t         code = TSDB_CODE_SUCCESS;
  SScanLogicNode* pMergeScan = (SScanLogicNode*)nodesCloneNode((SNode*)pScan);
  if (NULL == pMergeScan) {
    code = TSDB_CODE_OUT_OF_MEMORY;
  }

  SNodeList* pMergeKeys = NULL;
  if (TSDB_CODE_SUCCESS == code) {
    pMergeScan->scanType = SCAN_TYPE_TABLE_MERGE;
    pMergeScan->node.pChildren = pChildren;
    splSetParent((SLogicNode*)pMergeScan);
1047 1048
    code = stbSplCreateMergeKeysByPrimaryKey(stbSplFindPrimaryKeyFromScan(pMergeScan),
                                             pMergeScan->scanSeq[0] > 0 ? ORDER_ASC : ORDER_DESC, &pMergeKeys);
1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061
  }

  if (TSDB_CODE_SUCCESS == code) {
    *pOutputMergeScan = (SLogicNode*)pMergeScan;
    *pOutputMergeKeys = pMergeKeys;
  } else {
    nodesDestroyNode((SNode*)pMergeScan);
    nodesDestroyList(pMergeKeys);
  }

  return code;
}

1062 1063
static int32_t stbSplSplitMergeScanNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SScanLogicNode* pScan,
                                        bool groupSort) {
1064 1065 1066
  SLogicNode* pMergeScan = NULL;
  SNodeList*  pMergeKeys = NULL;
  int32_t     code = stbSplCreateMergeScanNode(pScan, &pMergeScan, &pMergeKeys);
X
Xiaoyu Wang 已提交
1067
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
1068 1069 1070 1071
    if (NULL != pMergeScan->pLimit) {
      ((SLimitNode*)pMergeScan->pLimit)->limit += ((SLimitNode*)pMergeScan->pLimit)->offset;
      ((SLimitNode*)pMergeScan->pLimit)->offset = 0;
    }
1072
    code = stbSplCreateMergeNode(pCxt, pSubplan, (SLogicNode*)pScan, pMergeKeys, pMergeScan, groupSort);
X
Xiaoyu Wang 已提交
1073 1074
  }
  if (TSDB_CODE_SUCCESS == code) {
1075
    nodesDestroyNode((SNode*)pScan);
X
Xiaoyu Wang 已提交
1076
    code = nodesListMakeStrictAppend(&pSubplan->pChildren,
1077
                                     (SNode*)splCreateScanSubplan(pCxt, pMergeScan, SPLIT_FLAG_STABLE_SPLIT));
X
Xiaoyu Wang 已提交
1078
  }
X
Xiaoyu Wang 已提交
1079
  ++(pCxt->groupId);
X
Xiaoyu Wang 已提交
1080 1081 1082
  return code;
}

1083 1084 1085 1086
static int32_t stbSplSplitScanNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
  SScanLogicNode* pScan = (SScanLogicNode*)pInfo->pSplitNode;
  if (SCAN_TYPE_TABLE_MERGE == pScan->scanType) {
    pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
1087
    return stbSplSplitMergeScanNode(pCxt, pInfo->pSubplan, pScan, true);
1088 1089 1090 1091 1092 1093 1094
  }
  if (NULL != pScan->pGroupTags) {
    return stbSplSplitScanNodeWithPartTags(pCxt, pInfo);
  }
  return stbSplSplitScanNodeWithoutPartTags(pCxt, pInfo);
}

X
Xiaoyu Wang 已提交
1095 1096 1097 1098 1099
static int32_t stbSplSplitJoinNodeImpl(SSplitContext* pCxt, SLogicSubplan* pSubplan, SJoinLogicNode* pJoin) {
  int32_t code = TSDB_CODE_SUCCESS;
  SNode*  pChild = NULL;
  FOREACH(pChild, pJoin->node.pChildren) {
    if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild)) {
1100
      code = stbSplSplitMergeScanNode(pCxt, pSubplan, (SScanLogicNode*)pChild, false);
X
Xiaoyu Wang 已提交
1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112
    } else if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pChild)) {
      code = stbSplSplitJoinNodeImpl(pCxt, pSubplan, (SJoinLogicNode*)pChild);
    } else {
      code = TSDB_CODE_PLAN_INTERNAL_ERROR;
    }
    if (TSDB_CODE_SUCCESS != code) {
      break;
    }
  }
  return code;
}

X
Xiaoyu Wang 已提交
1113
static int32_t stbSplSplitJoinNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
X
Xiaoyu Wang 已提交
1114 1115 1116 1117 1118 1119
  int32_t code = stbSplSplitJoinNodeImpl(pCxt, pInfo->pSubplan, (SJoinLogicNode*)pInfo->pSplitNode);
  if (TSDB_CODE_SUCCESS == code) {
    pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
    SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT);
  }
  return code;
X
Xiaoyu Wang 已提交
1120 1121
}

1122
static int32_t stbSplCreateMergeKeysForPartitionNode(SLogicNode* pPart, SNodeList** pMergeKeys) {
1123 1124
  SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pPart->pChildren, 0);
  SNode*          pPrimaryKey = nodesCloneNode(stbSplFindPrimaryKeyFromScan(pScan));
1125 1126 1127 1128 1129
  if (NULL == pPrimaryKey) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  int32_t code = nodesListAppend(pPart->pTargets, pPrimaryKey);
  if (TSDB_CODE_SUCCESS == code) {
1130
    code = stbSplCreateMergeKeysByPrimaryKey(pPrimaryKey, pScan->scanSeq[0] > 0 ? ORDER_ASC : ORDER_DESC, pMergeKeys);
1131 1132 1133 1134
  }
  return code;
}

1135
static int32_t stbSplSplitPartitionNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
1136 1137 1138
  int32_t    code = TSDB_CODE_SUCCESS;
  SNodeList* pMergeKeys = NULL;
  if (pInfo->pSplitNode->requireDataOrder >= DATA_ORDER_LEVEL_IN_GROUP) {
1139
    code = stbSplCreateMergeKeysForPartitionNode(pInfo->pSplitNode, &pMergeKeys);
1140 1141 1142 1143
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, pMergeKeys, pInfo->pSplitNode, true);
  }
1144 1145 1146 1147
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
                                     (SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
  }
1148
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
1149 1150 1151 1152
  ++(pCxt->groupId);
  return code;
}

X
Xiaoyu Wang 已提交
1153
static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
1154 1155 1156 1157
  if (pCxt->pPlanCxt->rSmaQuery) {
    return TSDB_CODE_SUCCESS;
  }

X
Xiaoyu Wang 已提交
1158 1159
  SStableSplitInfo info = {0};
  if (!splMatch(pCxt, pSubplan, SPLIT_FLAG_STABLE_SPLIT, (FSplFindSplitNode)stbSplFindSplitNode, &info)) {
1160 1161
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
1162 1163 1164

  int32_t code = TSDB_CODE_SUCCESS;
  switch (nodeType(info.pSplitNode)) {
X
Xiaoyu Wang 已提交
1165 1166 1167 1168 1169 1170
    case QUERY_NODE_LOGIC_PLAN_SCAN:
      code = stbSplSplitScanNode(pCxt, &info);
      break;
    case QUERY_NODE_LOGIC_PLAN_JOIN:
      code = stbSplSplitJoinNode(pCxt, &info);
      break;
1171 1172 1173
    case QUERY_NODE_LOGIC_PLAN_PARTITION:
      code = stbSplSplitPartitionNode(pCxt, &info);
      break;
X
Xiaoyu Wang 已提交
1174 1175 1176
    case QUERY_NODE_LOGIC_PLAN_AGG:
      code = stbSplSplitAggNode(pCxt, &info);
      break;
X
Xiaoyu Wang 已提交
1177 1178 1179
    case QUERY_NODE_LOGIC_PLAN_WINDOW:
      code = stbSplSplitWindowNode(pCxt, &info);
      break;
X
Xiaoyu Wang 已提交
1180 1181 1182
    case QUERY_NODE_LOGIC_PLAN_SORT:
      code = stbSplSplitSortNode(pCxt, &info);
      break;
X
Xiaoyu Wang 已提交
1183 1184
    default:
      break;
1185
  }
X
Xiaoyu Wang 已提交
1186

1187 1188 1189 1190
  pCxt->split = true;
  return code;
}

X
Xiaoyu Wang 已提交
1191 1192 1193 1194 1195 1196
typedef struct SSigTbJoinSplitInfo {
  SJoinLogicNode* pJoin;
  SLogicNode*     pSplitNode;
  SLogicSubplan*  pSubplan;
} SSigTbJoinSplitInfo;

X
Xiaoyu Wang 已提交
1197 1198 1199 1200 1201 1202
static bool sigTbJoinSplNeedSplit(SLogicNode* pNode) {
  if (QUERY_NODE_LOGIC_PLAN_JOIN != nodeType(pNode)) {
    return false;
  }

  SJoinLogicNode* pJoin = (SJoinLogicNode*)pNode;
X
Xiaoyu Wang 已提交
1203 1204 1205 1206 1207
  if (!pJoin->isSingleTableJoin) {
    return false;
  }
  return QUERY_NODE_LOGIC_PLAN_EXCHANGE != nodeType(nodesListGetNode(pJoin->node.pChildren, 0)) &&
         QUERY_NODE_LOGIC_PLAN_EXCHANGE != nodeType(nodesListGetNode(pJoin->node.pChildren, 1));
1208 1209
}

X
Xiaoyu Wang 已提交
1210 1211 1212 1213 1214
static bool sigTbJoinSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
                                      SSigTbJoinSplitInfo* pInfo) {
  if (sigTbJoinSplNeedSplit(pNode)) {
    pInfo->pJoin = (SJoinLogicNode*)pNode;
    pInfo->pSplitNode = (SLogicNode*)nodesListGetNode(pNode->pChildren, 1);
1215
    pInfo->pSubplan = pSubplan;
X
Xiaoyu Wang 已提交
1216
    return true;
1217
  }
X
Xiaoyu Wang 已提交
1218
  return false;
1219 1220
}

X
Xiaoyu Wang 已提交
1221 1222 1223
static int32_t singleTableJoinSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
  SSigTbJoinSplitInfo info = {0};
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)sigTbJoinSplFindSplitNode, &info)) {
X
Xiaoyu Wang 已提交
1224 1225
    return TSDB_CODE_SUCCESS;
  }
1226
  int32_t code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pSplitNode, info.pSubplan->subplanType);
X
Xiaoyu Wang 已提交
1227
  if (TSDB_CODE_SUCCESS == code) {
1228
    code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, info.pSplitNode, 0));
X
Xiaoyu Wang 已提交
1229 1230 1231 1232 1233 1234
  }
  ++(pCxt->groupId);
  pCxt->split = true;
  return code;
}

1235 1236 1237 1238 1239 1240 1241 1242
static int32_t unionSplitSubplan(SSplitContext* pCxt, SLogicSubplan* pUnionSubplan, SLogicNode* pSplitNode) {
  SNodeList* pSubplanChildren = pUnionSubplan->pChildren;
  pUnionSubplan->pChildren = NULL;

  int32_t code = TSDB_CODE_SUCCESS;

  SNode* pChild = NULL;
  FOREACH(pChild, pSplitNode->pChildren) {
X
Xiaoyu Wang 已提交
1243
    SLogicSubplan* pNewSubplan = splCreateSubplan(pCxt, (SLogicNode*)pChild);
1244
    code = nodesListMakeStrictAppend(&pUnionSubplan->pChildren, (SNode*)pNewSubplan);
1245 1246
    if (TSDB_CODE_SUCCESS == code) {
      REPLACE_NODE(NULL);
1247
      code = splMountSubplan(pNewSubplan, pSubplanChildren);
1248 1249 1250 1251
    }
    if (TSDB_CODE_SUCCESS != code) {
      break;
    }
1252
    ++(pCxt->groupId);
1253 1254 1255
  }
  if (TSDB_CODE_SUCCESS == code) {
    nodesDestroyList(pSubplanChildren);
X
Xiaoyu Wang 已提交
1256
    NODES_DESTORY_LIST(pSplitNode->pChildren);
1257 1258 1259 1260
  }
  return code;
}

X
Xiaoyu Wang 已提交
1261 1262 1263 1264 1265
typedef struct SUnionAllSplitInfo {
  SProjectLogicNode* pProject;
  SLogicSubplan*     pSubplan;
} SUnionAllSplitInfo;

X
Xiaoyu Wang 已提交
1266 1267
static bool unAllSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
                                  SUnionAllSplitInfo* pInfo) {
X
Xiaoyu Wang 已提交
1268
  if (QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
X
Xiaoyu Wang 已提交
1269
    pInfo->pProject = (SProjectLogicNode*)pNode;
X
Xiaoyu Wang 已提交
1270
    pInfo->pSubplan = pSubplan;
X
Xiaoyu Wang 已提交
1271
    return true;
X
Xiaoyu Wang 已提交
1272
  }
X
Xiaoyu Wang 已提交
1273
  return false;
X
Xiaoyu Wang 已提交
1274 1275
}

1276 1277
static int32_t unAllSplCreateExchangeNode(SSplitContext* pCxt, int32_t startGroupId, SLogicSubplan* pSubplan,
                                          SProjectLogicNode* pProject) {
1278
  SExchangeLogicNode* pExchange = (SExchangeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE);
X
Xiaoyu Wang 已提交
1279 1280 1281
  if (NULL == pExchange) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
1282 1283
  pExchange->srcStartGroupId = startGroupId;
  pExchange->srcEndGroupId = pCxt->groupId - 1;
X
Xiaoyu Wang 已提交
1284
  pExchange->node.precision = pProject->node.precision;
X
Xiaoyu Wang 已提交
1285 1286 1287 1288
  pExchange->node.pTargets = nodesCloneList(pProject->node.pTargets);
  if (NULL == pExchange->node.pTargets) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
1289
  TSWAP(pExchange->node.pLimit, pProject->node.pLimit);
X
Xiaoyu Wang 已提交
1290 1291 1292

  pSubplan->subplanType = SUBPLAN_TYPE_MERGE;

X
Xiaoyu Wang 已提交
1293 1294
  if (NULL == pProject->node.pParent) {
    pSubplan->pNode = (SLogicNode*)pExchange;
1295
    nodesDestroyNode((SNode*)pProject);
X
Xiaoyu Wang 已提交
1296 1297 1298 1299 1300
    return TSDB_CODE_SUCCESS;
  }

  SNode* pNode;
  FOREACH(pNode, pProject->node.pParent->pChildren) {
1301
    if (nodesEqualNode(pNode, (SNode*)pProject)) {
X
Xiaoyu Wang 已提交
1302 1303 1304 1305 1306
      REPLACE_NODE(pExchange);
      nodesDestroyNode(pNode);
      return TSDB_CODE_SUCCESS;
    }
  }
1307 1308
  nodesDestroyNode((SNode*)pExchange);
  return TSDB_CODE_PLAN_INTERNAL_ERROR;
X
Xiaoyu Wang 已提交
1309 1310
}

X
Xiaoyu Wang 已提交
1311 1312
static int32_t unionAllSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
  SUnionAllSplitInfo info = {0};
X
Xiaoyu Wang 已提交
1313
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)unAllSplFindSplitNode, &info)) {
1314 1315
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
1316

1317
  int32_t startGroupId = pCxt->groupId;
1318
  int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pProject);
X
Xiaoyu Wang 已提交
1319
  if (TSDB_CODE_SUCCESS == code) {
1320
    code = unAllSplCreateExchangeNode(pCxt, startGroupId, info.pSubplan, info.pProject);
X
Xiaoyu Wang 已提交
1321
  }
1322
  pCxt->split = true;
X
Xiaoyu Wang 已提交
1323 1324 1325
  return code;
}

X
Xiaoyu Wang 已提交
1326 1327 1328 1329 1330
typedef struct SUnionDistinctSplitInfo {
  SAggLogicNode* pAgg;
  SLogicSubplan* pSubplan;
} SUnionDistinctSplitInfo;

1331 1332
static int32_t unDistSplCreateExchangeNode(SSplitContext* pCxt, int32_t startGroupId, SLogicSubplan* pSubplan,
                                           SAggLogicNode* pAgg) {
1333
  SExchangeLogicNode* pExchange = (SExchangeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE);
X
Xiaoyu Wang 已提交
1334 1335 1336
  if (NULL == pExchange) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
1337 1338
  pExchange->srcStartGroupId = startGroupId;
  pExchange->srcEndGroupId = pCxt->groupId - 1;
X
Xiaoyu Wang 已提交
1339
  pExchange->node.precision = pAgg->node.precision;
X
Xiaoyu Wang 已提交
1340
  pExchange->node.pTargets = nodesCloneList(pAgg->pGroupKeys);
X
Xiaoyu Wang 已提交
1341 1342 1343 1344 1345 1346
  if (NULL == pExchange->node.pTargets) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  pSubplan->subplanType = SUBPLAN_TYPE_MERGE;

1347
  return nodesListMakeAppend(&pAgg->node.pChildren, (SNode*)pExchange);
X
Xiaoyu Wang 已提交
1348 1349
}

X
Xiaoyu Wang 已提交
1350 1351 1352 1353
static bool unDistSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
                                   SUnionDistinctSplitInfo* pInfo) {
  if (QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
    pInfo->pAgg = (SAggLogicNode*)pNode;
X
Xiaoyu Wang 已提交
1354
    pInfo->pSubplan = pSubplan;
X
Xiaoyu Wang 已提交
1355
    return true;
X
Xiaoyu Wang 已提交
1356
  }
X
Xiaoyu Wang 已提交
1357
  return false;
X
Xiaoyu Wang 已提交
1358 1359
}

X
Xiaoyu Wang 已提交
1360 1361
static int32_t unionDistinctSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
  SUnionDistinctSplitInfo info = {0};
X
Xiaoyu Wang 已提交
1362
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)unDistSplFindSplitNode, &info)) {
X
Xiaoyu Wang 已提交
1363 1364 1365
    return TSDB_CODE_SUCCESS;
  }

1366
  int32_t startGroupId = pCxt->groupId;
1367
  int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pAgg);
X
Xiaoyu Wang 已提交
1368
  if (TSDB_CODE_SUCCESS == code) {
1369
    code = unDistSplCreateExchangeNode(pCxt, startGroupId, info.pSubplan, info.pAgg);
X
Xiaoyu Wang 已提交
1370 1371 1372 1373 1374
  }
  pCxt->split = true;
  return code;
}

X
Xiaoyu Wang 已提交
1375 1376 1377 1378 1379
typedef struct SSmaIndexSplitInfo {
  SMergeLogicNode* pMerge;
  SLogicSubplan*   pSubplan;
} SSmaIndexSplitInfo;

X
Xiaoyu Wang 已提交
1380 1381
static bool smaIdxSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
                                   SSmaIndexSplitInfo* pInfo) {
X
Xiaoyu Wang 已提交
1382
  if (QUERY_NODE_LOGIC_PLAN_MERGE == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
X
Xiaoyu Wang 已提交
1383
    pInfo->pMerge = (SMergeLogicNode*)pNode;
X
Xiaoyu Wang 已提交
1384
    pInfo->pSubplan = pSubplan;
X
Xiaoyu Wang 已提交
1385
    return true;
X
Xiaoyu Wang 已提交
1386
  }
X
Xiaoyu Wang 已提交
1387
  return false;
X
Xiaoyu Wang 已提交
1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404
}

static int32_t smaIndexSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
  SSmaIndexSplitInfo info = {0};
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)smaIdxSplFindSplitNode, &info)) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pMerge);
  if (TSDB_CODE_SUCCESS == code) {
    info.pMerge->srcGroupId = pCxt->groupId;
  }
  ++(pCxt->groupId);
  pCxt->split = true;
  return code;
}

1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426
typedef struct SInsertSelectSplitInfo {
  SLogicNode*    pQueryRoot;
  SLogicSubplan* pSubplan;
} SInsertSelectSplitInfo;

static bool insSelSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
                                   SInsertSelectSplitInfo* pInfo) {
  if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY == nodeType(pNode) && 1 == LIST_LENGTH(pNode->pChildren) &&
      MODIFY_TABLE_TYPE_INSERT == ((SVnodeModifyLogicNode*)pNode)->modifyType) {
    pInfo->pQueryRoot = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0);
    pInfo->pSubplan = pSubplan;
    return true;
  }
  return false;
}

static int32_t insertSelectSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
  SInsertSelectSplitInfo info = {0};
  if (!splMatch(pCxt, pSubplan, SPLIT_FLAG_INSERT_SPLIT, (FSplFindSplitNode)insSelSplFindSplitNode, &info)) {
    return TSDB_CODE_SUCCESS;
  }

1427 1428
  SLogicSubplan* pNewSubplan = NULL;
  SNodeList*     pSubplanChildren = info.pSubplan->pChildren;
1429
  int32_t        code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pQueryRoot, SUBPLAN_TYPE_MODIFY);
1430
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
1431
    pNewSubplan = splCreateSubplan(pCxt, info.pQueryRoot);
1432 1433 1434 1435 1436 1437
    if (NULL == pNewSubplan) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)pNewSubplan);
1438
  }
1439
  if (TSDB_CODE_SUCCESS == code) {
1440
    code = splMountSubplan(pNewSubplan, pSubplanChildren);
1441 1442
  }

1443
  SPLIT_FLAG_SET_MASK(info.pSubplan->splitFlag, SPLIT_FLAG_INSERT_SPLIT);
1444 1445 1446 1447 1448
  ++(pCxt->groupId);
  pCxt->split = true;
  return code;
}

X
Xiaoyu Wang 已提交
1449 1450 1451 1452 1453 1454 1455
typedef struct SQnodeSplitInfo {
  SLogicNode*    pSplitNode;
  SLogicSubplan* pSubplan;
} SQnodeSplitInfo;

static bool qndSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
                                SQnodeSplitInfo* pInfo) {
1456
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) && NULL != pNode->pParent &&
X
Xiaoyu Wang 已提交
1457 1458
      QUERY_NODE_LOGIC_PLAN_INTERP_FUNC != nodeType(pNode->pParent) && ((SScanLogicNode*)pNode)->scanSeq[0] <= 1 &&
      ((SScanLogicNode*)pNode)->scanSeq[1] <= 1) {
X
Xiaoyu Wang 已提交
1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474
    pInfo->pSplitNode = pNode;
    pInfo->pSubplan = pSubplan;
    return true;
  }
  return false;
}

static int32_t qnodeSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
  if (QUERY_POLICY_QNODE != tsQueryPolicy) {
    return TSDB_CODE_SUCCESS;
  }

  SQnodeSplitInfo info = {0};
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)qndSplFindSplitNode, &info)) {
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
1475
  ((SScanLogicNode*)info.pSplitNode)->dataRequired = FUNC_DATA_REQUIRED_DATA_LOAD;
X
Xiaoyu Wang 已提交
1476 1477
  int32_t code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pSplitNode, info.pSubplan->subplanType);
  if (TSDB_CODE_SUCCESS == code) {
1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489
    SLogicSubplan* pScanSubplan = splCreateScanSubplan(pCxt, info.pSplitNode, 0);
    if (NULL != pScanSubplan) {
      if (NULL != info.pSubplan->pVgroupList) {
        info.pSubplan->numOfComputeNodes = info.pSubplan->pVgroupList->numOfVgroups;
        TSWAP(pScanSubplan->pVgroupList, info.pSubplan->pVgroupList);
      } else {
        info.pSubplan->numOfComputeNodes = 1;
      }
      code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)pScanSubplan);
    } else {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
X
Xiaoyu Wang 已提交
1490
  }
1491
  info.pSubplan->subplanType = SUBPLAN_TYPE_COMPUTE;
X
Xiaoyu Wang 已提交
1492 1493 1494 1495 1496
  ++(pCxt->groupId);
  pCxt->split = true;
  return code;
}

X
Xiaoyu Wang 已提交
1497 1498 1499 1500 1501
// clang-format off
static const SSplitRule splitRuleSet[] = {
  {.pName = "SuperTableSplit",      .splitFunc = stableSplit},
  {.pName = "SingleTableJoinSplit", .splitFunc = singleTableJoinSplit},
  {.pName = "UnionAllSplit",        .splitFunc = unionAllSplit},
X
Xiaoyu Wang 已提交
1502
  {.pName = "UnionDistinctSplit",   .splitFunc = unionDistinctSplit},
1503
  {.pName = "SmaIndexSplit",        .splitFunc = smaIndexSplit}, // not used yet
1504
  {.pName = "InsertSelectSplit",    .splitFunc = insertSelectSplit}
X
Xiaoyu Wang 已提交
1505 1506
};
// clang-format on
X
Xiaoyu Wang 已提交
1507 1508 1509

static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule));

1510
static void dumpLogicSubplan(const char* pRuleName, SLogicSubplan* pSubplan) {
1511
  if (!tsQueryPlannerTrace) {
1512 1513
    return;
  }
1514
  char* pStr = NULL;
1515
  nodesNodeToString((SNode*)pSubplan, false, &pStr, NULL);
1516 1517 1518 1519 1520
  if (NULL == pRuleName) {
    qDebugL("before split: %s", pStr);
  } else {
    qDebugL("apply split %s rule: %s", pRuleName, pStr);
  }
1521 1522 1523
  taosMemoryFree(pStr);
}

1524 1525 1526 1527
static int32_t applySplitRule(SPlanContext* pCxt, SLogicSubplan* pSubplan) {
  SSplitContext cxt = {
      .pPlanCxt = pCxt, .queryId = pSubplan->id.queryId, .groupId = pSubplan->id.groupId + 1, .split = false};
  bool split = false;
1528
  dumpLogicSubplan(NULL, pSubplan);
X
Xiaoyu Wang 已提交
1529
  do {
1530
    split = false;
X
Xiaoyu Wang 已提交
1531
    for (int32_t i = 0; i < splitRuleNum; ++i) {
1532
      cxt.split = false;
1533
      int32_t code = splitRuleSet[i].splitFunc(&cxt, pSubplan);
X
Xiaoyu Wang 已提交
1534 1535 1536
      if (TSDB_CODE_SUCCESS != code) {
        return code;
      }
1537 1538 1539 1540
      if (cxt.split) {
        split = true;
        dumpLogicSubplan(splitRuleSet[i].pName, pSubplan);
      }
X
Xiaoyu Wang 已提交
1541
    }
1542
  } while (split);
1543
  return qnodeSplit(&cxt, pSubplan);
X
Xiaoyu Wang 已提交
1544
}
X
Xiaoyu Wang 已提交
1545

X
Xiaoyu Wang 已提交
1546 1547 1548 1549 1550 1551 1552 1553 1554 1555
static void setVgroupsInfo(SLogicNode* pNode, SLogicSubplan* pSubplan) {
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
    TSWAP(((SScanLogicNode*)pNode)->pVgroupList, pSubplan->pVgroupList);
    return;
  }

  SNode* pChild;
  FOREACH(pChild, pNode->pChildren) { setVgroupsInfo((SLogicNode*)pChild, pSubplan); }
}

1556 1557 1558 1559 1560 1561 1562 1563
static bool needSplitSubplan(SLogicSubplan* pLogicSubplan) {
  if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY != nodeType(pLogicSubplan->pNode)) {
    return true;
  }
  SVnodeModifyLogicNode* pModify = (SVnodeModifyLogicNode*)pLogicSubplan->pNode;
  return (MODIFY_TABLE_TYPE_INSERT == pModify->modifyType && NULL != pModify->node.pChildren);
}

X
Xiaoyu Wang 已提交
1564
int32_t splitLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) {
1565
  if (!needSplitSubplan(pLogicSubplan)) {
X
Xiaoyu Wang 已提交
1566 1567
    setVgroupsInfo(pLogicSubplan->pNode, pLogicSubplan);
    return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
1568
  }
X
Xiaoyu Wang 已提交
1569 1570
  return applySplitRule(pCxt, pLogicSubplan);
}