planSpliter.c 35.2 KB
Newer Older
X
Xiaoyu Wang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

X
Xiaoyu Wang 已提交
16
#include "functionMgt.h"
X
Xiaoyu Wang 已提交
17
#include "planInt.h"
X
Xiaoyu Wang 已提交
18

X
Xiaoyu Wang 已提交
19
#define SPLIT_FLAG_MASK(n) (1 << n)
X
Xiaoyu Wang 已提交
20

X
Xiaoyu Wang 已提交
21
#define SPLIT_FLAG_STABLE_SPLIT SPLIT_FLAG_MASK(0)
X
Xiaoyu Wang 已提交
22

X
Xiaoyu Wang 已提交
23
#define SPLIT_FLAG_SET_MASK(val, mask)  (val) |= (mask)
X
Xiaoyu Wang 已提交
24 25 26
#define SPLIT_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0)

typedef struct SSplitContext {
27 28 29 30
  SPlanContext* pPlanCxt;
  uint64_t      queryId;
  int32_t       groupId;
  bool          split;
X
Xiaoyu Wang 已提交
31 32
} SSplitContext;

33
typedef int32_t (*FSplit)(SSplitContext* pCxt, SLogicSubplan* pSubplan);
X
Xiaoyu Wang 已提交
34 35

typedef struct SSplitRule {
X
Xiaoyu Wang 已提交
36
  char*  pName;
X
Xiaoyu Wang 已提交
37 38 39
  FSplit splitFunc;
} SSplitRule;

40
typedef bool (*FSplFindSplitNode)(SSplitContext* pCxt, SLogicSubplan* pSubplan, void* pInfo);
X
Xiaoyu Wang 已提交
41

X
Xiaoyu Wang 已提交
42 43 44 45 46 47 48 49 50 51 52
static void splSetSubplanVgroups(SLogicSubplan* pSubplan, SLogicNode* pNode) {
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
    TSWAP(pSubplan->pVgroupList, ((SScanLogicNode*)pNode)->pVgroupList);
  } else {
    if (1 == LIST_LENGTH(pNode->pChildren)) {
      splSetSubplanVgroups(pSubplan, (SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
    }
  }
}

static SLogicSubplan* splCreateScanSubplan(SSplitContext* pCxt, SLogicNode* pNode, int32_t flag) {
53
  SLogicSubplan* pSubplan = (SLogicSubplan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
X
Xiaoyu Wang 已提交
54 55 56
  if (NULL == pSubplan) {
    return NULL;
  }
57
  pSubplan->id.queryId = pCxt->queryId;
X
Xiaoyu Wang 已提交
58 59
  pSubplan->id.groupId = pCxt->groupId;
  pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
X
Xiaoyu Wang 已提交
60 61 62
  pSubplan->pNode = pNode;
  pSubplan->pNode->pParent = NULL;
  splSetSubplanVgroups(pSubplan, pNode);
63
  SPLIT_FLAG_SET_MASK(pSubplan->splitFlag, flag);
X
Xiaoyu Wang 已提交
64 65 66
  return pSubplan;
}

67
static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pChild, SExchangeLogicNode** pOutput) {
68
  SExchangeLogicNode* pExchange = (SExchangeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE);
X
Xiaoyu Wang 已提交
69 70 71 72
  if (NULL == pExchange) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  pExchange->srcGroupId = pCxt->groupId;
73 74
  pExchange->node.precision = pChild->precision;
  pExchange->node.pTargets = nodesCloneList(pChild->pTargets);
X
Xiaoyu Wang 已提交
75 76 77 78
  if (NULL == pExchange->node.pTargets) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

79 80 81 82
  *pOutput = pExchange;
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
83 84 85 86 87
static int32_t splCreateExchangeNodeForSubplan(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode,
                                               ESubplanType subplanType) {
  SExchangeLogicNode* pExchange = NULL;
  int32_t             code = splCreateExchangeNode(pCxt, pSplitNode, &pExchange);
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
88
    code = replaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pExchange);
X
Xiaoyu Wang 已提交
89 90 91 92
  }
  if (TSDB_CODE_SUCCESS == code) {
    pSubplan->subplanType = subplanType;
  } else {
93
    nodesDestroyNode((SNode*)pExchange);
X
Xiaoyu Wang 已提交
94 95
  }
  return code;
X
Xiaoyu Wang 已提交
96 97
}

98 99
static bool splMatch(SSplitContext* pCxt, SLogicSubplan* pSubplan, int32_t flag, FSplFindSplitNode func, void* pInfo) {
  if (!SPLIT_FLAG_TEST_MASK(pSubplan->splitFlag, flag)) {
100
    if (func(pCxt, pSubplan, pInfo)) {
101 102 103 104 105 106 107 108 109 110 111 112
      return true;
    }
  }
  SNode* pChild;
  FOREACH(pChild, pSubplan->pChildren) {
    if (splMatch(pCxt, (SLogicSubplan*)pChild, flag, func, pInfo)) {
      return true;
    }
  }
  return false;
}

X
Xiaoyu Wang 已提交
113
typedef struct SStableSplitInfo {
X
Xiaoyu Wang 已提交
114 115
  SLogicNode*    pSplitNode;
  SLogicSubplan* pSubplan;
X
Xiaoyu Wang 已提交
116 117
} SStableSplitInfo;

X
Xiaoyu Wang 已提交
118 119 120
static bool stbSplHasGatherExecFunc(const SNodeList* pFuncs) {
  SNode* pFunc = NULL;
  FOREACH(pFunc, pFuncs) {
121 122
    if (!fmIsWindowPseudoColumnFunc(((SFunctionNode*)pFunc)->funcId) &&
        !fmIsDistExecFunc(((SFunctionNode*)pFunc)->funcId)) {
X
Xiaoyu Wang 已提交
123 124 125 126 127 128
      return true;
    }
  }
  return false;
}

129 130
static bool stbSplIsMultiTbScan(bool streamQuery, SScanLogicNode* pScan) {
  return (NULL != pScan->pVgroupList && pScan->pVgroupList->numOfVgroups > 1) ||
X
Xiaoyu Wang 已提交
131
         (streamQuery && TSDB_SUPER_TABLE == pScan->tableType);
X
Xiaoyu Wang 已提交
132 133
}

134
static bool stbSplHasMultiTbScan(bool streamQuery, SLogicNode* pNode) {
X
Xiaoyu Wang 已提交
135 136 137 138
  if (1 != LIST_LENGTH(pNode->pChildren)) {
    return false;
  }
  SNode* pChild = nodesListGetNode(pNode->pChildren, 0);
X
Xiaoyu Wang 已提交
139 140 141 142 143 144
  if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pChild)) {
    if (1 != LIST_LENGTH(((SLogicNode*)pChild)->pChildren)) {
      return false;
    }
    pChild = nodesListGetNode(((SLogicNode*)pChild)->pChildren, 0);
  }
145
  return (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pChild));
X
Xiaoyu Wang 已提交
146 147
}

148
static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
X
Xiaoyu Wang 已提交
149
  switch (nodeType(pNode)) {
X
Xiaoyu Wang 已提交
150 151 152 153
    case QUERY_NODE_LOGIC_PLAN_SCAN:
      return stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode);
    case QUERY_NODE_LOGIC_PLAN_JOIN:
      return !(((SJoinLogicNode*)pNode)->isSingleTableJoin);
X
Xiaoyu Wang 已提交
154 155
    case QUERY_NODE_LOGIC_PLAN_AGG:
      return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
156 157 158 159 160 161 162
    case QUERY_NODE_LOGIC_PLAN_WINDOW: {
      SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode;
      if (WINDOW_TYPE_INTERVAL != pWindow->winType) {
        return false;
      }
      return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
    }
163 164
    case QUERY_NODE_LOGIC_PLAN_SORT:
      return stbSplHasMultiTbScan(streamQuery, pNode);
X
Xiaoyu Wang 已提交
165 166 167 168 169 170
    default:
      break;
  }
  return false;
}

171 172
static SLogicNode* stbSplMatchByNode(bool streamQuery, SLogicNode* pNode) {
  if (stbSplNeedSplit(streamQuery, pNode)) {
173 174 175 176
    return pNode;
  }
  SNode* pChild;
  FOREACH(pChild, pNode->pChildren) {
177
    SLogicNode* pSplitNode = stbSplMatchByNode(streamQuery, (SLogicNode*)pChild);
178 179 180 181 182 183 184
    if (NULL != pSplitNode) {
      return pSplitNode;
    }
  }
  return NULL;
}

185 186
static bool stbSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SStableSplitInfo* pInfo) {
  SLogicNode* pSplitNode = stbSplMatchByNode(pCxt->pPlanCxt->streamQuery, pSubplan->pNode);
187
  if (NULL != pSplitNode) {
X
Xiaoyu Wang 已提交
188
    pInfo->pSplitNode = pSplitNode;
189 190 191 192 193
    pInfo->pSubplan = pSubplan;
  }
  return NULL != pSplitNode;
}

X
Xiaoyu Wang 已提交
194 195 196 197 198 199 200 201
static int32_t stbSplRewriteFuns(const SNodeList* pFuncs, SNodeList** pPartialFuncs, SNodeList** pMergeFuncs) {
  SNode* pNode = NULL;
  FOREACH(pNode, pFuncs) {
    SFunctionNode* pFunc = (SFunctionNode*)pNode;
    SFunctionNode* pPartFunc = NULL;
    SFunctionNode* pMergeFunc = NULL;
    int32_t        code = TSDB_CODE_SUCCESS;
    if (fmIsWindowPseudoColumnFunc(pFunc->funcId)) {
202 203
      pPartFunc = (SFunctionNode*)nodesCloneNode(pNode);
      pMergeFunc = (SFunctionNode*)nodesCloneNode(pNode);
X
Xiaoyu Wang 已提交
204
      if (NULL == pPartFunc || NULL == pMergeFunc) {
205 206
        nodesDestroyNode((SNode*)pPartFunc);
        nodesDestroyNode((SNode*)pMergeFunc);
X
Xiaoyu Wang 已提交
207 208 209 210 211 212
        code = TSDB_CODE_OUT_OF_MEMORY;
      }
    } else {
      code = fmGetDistMethod(pFunc, &pPartFunc, &pMergeFunc);
    }
    if (TSDB_CODE_SUCCESS == code) {
213
      code = nodesListMakeStrictAppend(pPartialFuncs, (SNode*)pPartFunc);
X
Xiaoyu Wang 已提交
214 215
    }
    if (TSDB_CODE_SUCCESS == code) {
216
      code = nodesListMakeStrictAppend(pMergeFuncs, (SNode*)pMergeFunc);
X
Xiaoyu Wang 已提交
217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237
    }
    if (TSDB_CODE_SUCCESS != code) {
      nodesDestroyList(*pPartialFuncs);
      nodesDestroyList(*pMergeFuncs);
      return code;
    }
  }
  return TSDB_CODE_SUCCESS;
}

static int32_t stbSplAppendWStart(SNodeList* pFuncs, int32_t* pIndex) {
  int32_t index = 0;
  SNode*  pFunc = NULL;
  FOREACH(pFunc, pFuncs) {
    if (FUNCTION_TYPE_WSTARTTS == ((SFunctionNode*)pFunc)->funcType) {
      *pIndex = index;
      return TSDB_CODE_SUCCESS;
    }
    ++index;
  }

238
  SFunctionNode* pWStart = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
X
Xiaoyu Wang 已提交
239 240 241 242 243 244 245
  if (NULL == pWStart) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  strcpy(pWStart->functionName, "_wstartts");
  snprintf(pWStart->node.aliasName, sizeof(pWStart->node.aliasName), "%s.%p", pWStart->functionName, pWStart);
  int32_t code = fmGetFuncInfo(pWStart, NULL, 0);
  if (TSDB_CODE_SUCCESS == code) {
246
    code = nodesListStrictAppend(pFuncs, (SNode*)pWStart);
X
Xiaoyu Wang 已提交
247 248 249 250 251 252 253 254 255 256 257 258 259 260
  }
  *pIndex = index;
  return code;
}

static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogicNode** pPartWindow) {
  SNodeList* pFunc = pMergeWindow->pFuncs;
  pMergeWindow->pFuncs = NULL;
  SNodeList* pTargets = pMergeWindow->node.pTargets;
  pMergeWindow->node.pTargets = NULL;
  SNodeList* pChildren = pMergeWindow->node.pChildren;
  pMergeWindow->node.pChildren = NULL;

  int32_t           code = TSDB_CODE_SUCCESS;
261
  SWindowLogicNode* pPartWin = (SWindowLogicNode*)nodesCloneNode((SNode*)pMergeWindow);
X
Xiaoyu Wang 已提交
262 263 264 265 266 267 268 269 270 271 272 273 274 275
  if (NULL == pPartWin) {
    code = TSDB_CODE_OUT_OF_MEMORY;
  }

  if (TSDB_CODE_SUCCESS == code) {
    pMergeWindow->node.pTargets = pTargets;
    pPartWin->node.pChildren = pChildren;
    code = stbSplRewriteFuns(pFunc, &pPartWin->pFuncs, &pMergeWindow->pFuncs);
  }
  int32_t index = 0;
  if (TSDB_CODE_SUCCESS == code) {
    code = stbSplAppendWStart(pPartWin->pFuncs, &index);
  }
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
276
    code = createColumnByRewriteExprs(pPartWin->pFuncs, &pPartWin->node.pTargets);
X
Xiaoyu Wang 已提交
277 278 279 280 281 282 283 284 285 286 287 288 289
  }
  if (TSDB_CODE_SUCCESS == code) {
    nodesDestroyNode(pMergeWindow->pTspk);
    pMergeWindow->pTspk = nodesCloneNode(nodesListGetNode(pPartWin->node.pTargets, index));
    if (NULL == pMergeWindow->pTspk) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  }

  nodesDestroyList(pFunc);
  if (TSDB_CODE_SUCCESS == code) {
    *pPartWindow = (SLogicNode*)pPartWin;
  } else {
290
    nodesDestroyNode((SNode*)pPartWin);
X
Xiaoyu Wang 已提交
291 292 293 294 295
  }

  return code;
}

X
Xiaoyu Wang 已提交
296 297 298 299 300 301 302 303 304 305 306
static int32_t stbSplGetNumOfVgroups(SLogicNode* pNode) {
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
    return ((SScanLogicNode*)pNode)->pVgroupList->numOfVgroups;
  } else {
    if (1 == LIST_LENGTH(pNode->pChildren)) {
      return stbSplGetNumOfVgroups((SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
    }
  }
  return 0;
}

X
Xiaoyu Wang 已提交
307 308
static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode,
                                     SNodeList* pMergeKeys, SLogicNode* pPartChild) {
309
  SMergeLogicNode* pMerge = (SMergeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE);
X
Xiaoyu Wang 已提交
310 311 312
  if (NULL == pMerge) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
313
  pMerge->numOfChannels = stbSplGetNumOfVgroups(pPartChild);
X
Xiaoyu Wang 已提交
314 315
  pMerge->srcGroupId = pCxt->groupId;
  pMerge->node.precision = pPartChild->precision;
X
Xiaoyu Wang 已提交
316
  pMerge->pMergeKeys = pMergeKeys;
X
Xiaoyu Wang 已提交
317 318 319

  int32_t code = TSDB_CODE_SUCCESS;
  pMerge->pInputs = nodesCloneList(pPartChild->pTargets);
320 321 322 323 324 325
  // NULL == pSubplan means 'merge node' replaces 'split node'.
  if (NULL == pSubplan) {
    pMerge->node.pTargets = nodesCloneList(pPartChild->pTargets);
  } else {
    pMerge->node.pTargets = nodesCloneList(pSplitNode->pTargets);
  }
X
Xiaoyu Wang 已提交
326 327 328 329 330
  if (NULL == pMerge->node.pTargets || NULL == pMerge->pInputs) {
    code = TSDB_CODE_OUT_OF_MEMORY;
  }
  if (TSDB_CODE_SUCCESS == code) {
    if (NULL == pSubplan) {
331
      code = nodesListMakeAppend(&pSplitNode->pChildren, (SNode*)pMerge);
X
Xiaoyu Wang 已提交
332
    } else {
X
Xiaoyu Wang 已提交
333
      code = replaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pMerge);
X
Xiaoyu Wang 已提交
334 335 336
    }
  }
  if (TSDB_CODE_SUCCESS != code) {
337
    nodesDestroyNode((SNode*)pMerge);
X
Xiaoyu Wang 已提交
338
  }
X
Xiaoyu Wang 已提交
339
  return code;
X
Xiaoyu Wang 已提交
340 341
}

X
Xiaoyu Wang 已提交
342 343 344 345
static int32_t stbSplCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pParent, SLogicNode* pPartChild) {
  SExchangeLogicNode* pExchange = NULL;
  int32_t             code = splCreateExchangeNode(pCxt, pPartChild, &pExchange);
  if (TSDB_CODE_SUCCESS == code) {
346
    code = nodesListMakeAppend(&pParent->pChildren, (SNode*)pExchange);
X
Xiaoyu Wang 已提交
347 348 349 350
  }
  return code;
}

X
Xiaoyu Wang 已提交
351
static int32_t stbSplCreateMergeKeysByPrimaryKey(SNode* pPrimaryKey, SNodeList** pMergeKeys) {
352
  SOrderByExprNode* pMergeKey = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
353 354 355
  if (NULL == pMergeKey) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
X
Xiaoyu Wang 已提交
356
  pMergeKey->pExpr = nodesCloneNode(pPrimaryKey);
357
  if (NULL == pMergeKey->pExpr) {
358
    nodesDestroyNode((SNode*)pMergeKey);
359 360 361 362
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  pMergeKey->order = ORDER_ASC;
  pMergeKey->nullOrder = NULL_ORDER_FIRST;
363
  return nodesListMakeStrictAppend(pMergeKeys, (SNode*)pMergeKey);
364 365
}

366
static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
X
Xiaoyu Wang 已提交
367 368 369
  SLogicNode* pPartWindow = NULL;
  int32_t     code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
370
    ((SWindowLogicNode*)pPartWindow)->intervalAlgo = INTERVAL_ALGO_HASH;
X
Xiaoyu Wang 已提交
371 372
    ((SWindowLogicNode*)pInfo->pSplitNode)->intervalAlgo = INTERVAL_ALGO_MERGE;
    SNodeList* pMergeKeys = NULL;
X
Xiaoyu Wang 已提交
373
    code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk, &pMergeKeys);
X
Xiaoyu Wang 已提交
374 375 376 377 378 379
    if (TSDB_CODE_SUCCESS == code) {
      code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow);
    }
    if (TSDB_CODE_SUCCESS != code) {
      nodesDestroyList(pMergeKeys);
    }
X
Xiaoyu Wang 已提交
380 381 382
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
383
                                     (SNode*)splCreateScanSubplan(pCxt, pPartWindow, SPLIT_FLAG_STABLE_SPLIT));
X
Xiaoyu Wang 已提交
384 385 386 387 388
  }
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
  return code;
}

389
static int32_t stbSplSplitIntervalForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
390 391 392
  SLogicNode* pPartWindow = NULL;
  int32_t     code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
393 394
    ((SWindowLogicNode*)pPartWindow)->intervalAlgo = INTERVAL_ALGO_STREAM_SEMI;
    ((SWindowLogicNode*)pInfo->pSplitNode)->intervalAlgo = INTERVAL_ALGO_STREAM_FINAL;
395 396 397 398
    code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartWindow);
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
399
                                     (SNode*)splCreateScanSubplan(pCxt, pPartWindow, SPLIT_FLAG_STABLE_SPLIT));
400 401 402 403 404
  }
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
  return code;
}

405
static int32_t stbSplSplitInterval(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
406
  if (pCxt->pPlanCxt->streamQuery) {
407
    return stbSplSplitIntervalForStream(pCxt, pInfo);
408
  } else {
409
    return stbSplSplitIntervalForBatch(pCxt, pInfo);
410 411 412
  }
}

413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428
static int32_t stbSplSplitSession(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
  return TSDB_CODE_PLAN_INTERNAL_ERROR;
}

static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
  switch (((SWindowLogicNode*)pInfo->pSplitNode)->winType) {
    case WINDOW_TYPE_INTERVAL:
      return stbSplSplitInterval(pCxt, pInfo);
    case WINDOW_TYPE_SESSION:
      return stbSplSplitSession(pCxt, pInfo);
    default:
      break;
  }
  return TSDB_CODE_PLAN_INTERNAL_ERROR;
}

X
Xiaoyu Wang 已提交
429 430 431 432 433 434 435 436 437
static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pOutput) {
  SNodeList* pFunc = pMergeAgg->pAggFuncs;
  pMergeAgg->pAggFuncs = NULL;
  SNodeList* pGroupKeys = pMergeAgg->pGroupKeys;
  pMergeAgg->pGroupKeys = NULL;
  SNodeList* pTargets = pMergeAgg->node.pTargets;
  pMergeAgg->node.pTargets = NULL;
  SNodeList* pChildren = pMergeAgg->node.pChildren;
  pMergeAgg->node.pChildren = NULL;
X
Xiaoyu Wang 已提交
438 439
  SNode* pConditions = pMergeAgg->node.pConditions;
  pMergeAgg->node.pConditions = NULL;
X
Xiaoyu Wang 已提交
440 441

  int32_t        code = TSDB_CODE_SUCCESS;
442
  SAggLogicNode* pPartAgg = (SAggLogicNode*)nodesCloneNode((SNode*)pMergeAgg);
X
Xiaoyu Wang 已提交
443 444 445 446 447 448
  if (NULL == pPartAgg) {
    code = TSDB_CODE_OUT_OF_MEMORY;
  }

  if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) {
    pPartAgg->pGroupKeys = pGroupKeys;
X
Xiaoyu Wang 已提交
449
    code = createColumnByRewriteExprs(pPartAgg->pGroupKeys, &pPartAgg->node.pTargets);
X
Xiaoyu Wang 已提交
450 451 452 453 454 455 456 457
  }
  if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) {
    pMergeAgg->pGroupKeys = nodesCloneList(pPartAgg->node.pTargets);
    if (NULL == pMergeAgg->pGroupKeys) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    }
  }
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
458
    pMergeAgg->node.pConditions = pConditions;
X
Xiaoyu Wang 已提交
459 460 461 462 463 464
    pMergeAgg->node.pTargets = pTargets;
    pPartAgg->node.pChildren = pChildren;

    code = stbSplRewriteFuns(pFunc, &pPartAgg->pAggFuncs, &pMergeAgg->pAggFuncs);
  }
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
465
    code = createColumnByRewriteExprs(pPartAgg->pAggFuncs, &pPartAgg->node.pTargets);
X
Xiaoyu Wang 已提交
466 467 468 469 470 471
  }

  nodesDestroyList(pFunc);
  if (TSDB_CODE_SUCCESS == code) {
    *pOutput = (SLogicNode*)pPartAgg;
  } else {
472
    nodesDestroyNode((SNode*)pPartAgg);
X
Xiaoyu Wang 已提交
473 474 475 476 477 478 479 480 481 482 483 484 485
  }

  return code;
}

static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
  SLogicNode* pPartAgg = NULL;
  int32_t     code = stbSplCreatePartAggNode((SAggLogicNode*)pInfo->pSplitNode, &pPartAgg);
  if (TSDB_CODE_SUCCESS == code) {
    code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartAgg);
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
486
                                     (SNode*)splCreateScanSubplan(pCxt, pPartAgg, SPLIT_FLAG_STABLE_SPLIT));
X
Xiaoyu Wang 已提交
487 488 489 490 491
  }
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
  return code;
}

X
Xiaoyu Wang 已提交
492
static SNode* stbSplCreateColumnNode(SExprNode* pExpr) {
493
  SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
X
Xiaoyu Wang 已提交
494 495 496 497 498 499 500 501 502 503 504 505 506
  if (NULL == pCol) {
    return NULL;
  }
  if (QUERY_NODE_COLUMN == nodeType(pExpr)) {
    strcpy(pCol->tableAlias, ((SColumnNode*)pExpr)->tableAlias);
  }
  strcpy(pCol->colName, pExpr->aliasName);
  strcpy(pCol->node.aliasName, pExpr->aliasName);
  pCol->node.resType = pExpr->resType;
  return (SNode*)pCol;
}

static SNode* stbSplCreateOrderByExpr(SOrderByExprNode* pSortKey, SNode* pCol) {
507
  SOrderByExprNode* pOutput = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
X
Xiaoyu Wang 已提交
508 509 510 511 512
  if (NULL == pOutput) {
    return NULL;
  }
  pOutput->pExpr = nodesCloneNode(pCol);
  if (NULL == pOutput->pExpr) {
513
    nodesDestroyNode((SNode*)pOutput);
X
Xiaoyu Wang 已提交
514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564
    return NULL;
  }
  pOutput->order = pSortKey->order;
  pOutput->nullOrder = pSortKey->nullOrder;
  return (SNode*)pOutput;
}

static int32_t stbSplCreateMergeKeys(SNodeList* pSortKeys, SNodeList* pTargets, SNodeList** pOutput) {
  int32_t    code = TSDB_CODE_SUCCESS;
  SNodeList* pMergeKeys = NULL;
  SNode*     pNode = NULL;
  FOREACH(pNode, pSortKeys) {
    SOrderByExprNode* pSortKey = (SOrderByExprNode*)pNode;
    SNode*            pTarget = NULL;
    bool              found = false;
    FOREACH(pTarget, pTargets) {
      if (0 == strcmp(((SExprNode*)pSortKey->pExpr)->aliasName, ((SColumnNode*)pTarget)->colName)) {
        code = nodesListMakeStrictAppend(&pMergeKeys, stbSplCreateOrderByExpr(pSortKey, pTarget));
        if (TSDB_CODE_SUCCESS != code) {
          break;
        }
        found = true;
      }
    }
    if (TSDB_CODE_SUCCESS == code && !found) {
      SNode* pCol = stbSplCreateColumnNode((SExprNode*)pSortKey->pExpr);
      code = nodesListMakeStrictAppend(&pMergeKeys, stbSplCreateOrderByExpr(pSortKey, pCol));
      if (TSDB_CODE_SUCCESS == code) {
        code = nodesListStrictAppend(pTargets, pCol);
      } else {
        nodesDestroyNode(pCol);
      }
    }
    if (TSDB_CODE_SUCCESS != code) {
      break;
    }
  }
  if (TSDB_CODE_SUCCESS == code) {
    *pOutput = pMergeKeys;
  } else {
    nodesDestroyList(pMergeKeys);
  }
  return code;
}

static int32_t stbSplCreatePartSortNode(SSortLogicNode* pSort, SLogicNode** pOutputPartSort,
                                        SNodeList** pOutputMergeKeys) {
  SNodeList* pSortKeys = pSort->pSortKeys;
  pSort->pSortKeys = NULL;
  SNodeList* pChildren = pSort->node.pChildren;
  pSort->node.pChildren = NULL;
X
Xiaoyu Wang 已提交
565 566

  int32_t         code = TSDB_CODE_SUCCESS;
567
  SSortLogicNode* pPartSort = (SSortLogicNode*)nodesCloneNode((SNode*)pSort);
X
Xiaoyu Wang 已提交
568 569 570 571
  if (NULL == pPartSort) {
    code = TSDB_CODE_OUT_OF_MEMORY;
  }

X
Xiaoyu Wang 已提交
572
  SNodeList* pMergeKeys = NULL;
X
Xiaoyu Wang 已提交
573
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
574
    pPartSort->node.pChildren = pChildren;
X
Xiaoyu Wang 已提交
575
    pPartSort->pSortKeys = pSortKeys;
X
Xiaoyu Wang 已提交
576
    code = stbSplCreateMergeKeys(pPartSort->pSortKeys, pPartSort->node.pTargets, &pMergeKeys);
X
Xiaoyu Wang 已提交
577 578 579
  }

  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
580 581
    *pOutputPartSort = (SLogicNode*)pPartSort;
    *pOutputMergeKeys = pMergeKeys;
X
Xiaoyu Wang 已提交
582
  } else {
583
    nodesDestroyNode((SNode*)pPartSort);
X
Xiaoyu Wang 已提交
584
    nodesDestroyList(pMergeKeys);
X
Xiaoyu Wang 已提交
585 586 587 588 589 590 591
  }

  return code;
}

static int32_t stbSplSplitSortNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
  SLogicNode* pPartSort = NULL;
X
Xiaoyu Wang 已提交
592 593
  SNodeList*  pMergeKeys = NULL;
  int32_t     code = stbSplCreatePartSortNode((SSortLogicNode*)pInfo->pSplitNode, &pPartSort, &pMergeKeys);
X
Xiaoyu Wang 已提交
594
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
595
    code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, pMergeKeys, pPartSort);
X
Xiaoyu Wang 已提交
596 597 598
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
599
                                     (SNode*)splCreateScanSubplan(pCxt, pPartSort, SPLIT_FLAG_STABLE_SPLIT));
X
Xiaoyu Wang 已提交
600 601 602 603 604
  }
  pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
  return code;
}

X
Xiaoyu Wang 已提交
605
static int32_t stbSplSplitScanNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
606
  int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pInfo->pSplitNode, SUBPLAN_TYPE_MERGE);
X
Xiaoyu Wang 已提交
607 608
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
609
                                     (SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
X
Xiaoyu Wang 已提交
610 611 612 613
  }
  return code;
}

X
Xiaoyu Wang 已提交
614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654
static SNode* stbSplFindPrimaryKeyFromScan(SScanLogicNode* pScan) {
  SNode* pCol = NULL;
  FOREACH(pCol, pScan->pScanCols) {
    if (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pCol)->colId) {
      return pCol;
    }
  }
  return NULL;
}

static int32_t stbSplSplitScanNodeForJoin(SSplitContext* pCxt, SLogicSubplan* pSubplan, SScanLogicNode* pScan) {
  SNodeList* pMergeKeys = NULL;
  int32_t    code = stbSplCreateMergeKeysByPrimaryKey(stbSplFindPrimaryKeyFromScan(pScan), &pMergeKeys);
  if (TSDB_CODE_SUCCESS == code) {
    code = stbSplCreateMergeNode(pCxt, pSubplan, (SLogicNode*)pScan, pMergeKeys, (SLogicNode*)pScan);
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = nodesListMakeStrictAppend(&pSubplan->pChildren,
                                     (SNode*)splCreateScanSubplan(pCxt, (SLogicNode*)pScan, SPLIT_FLAG_STABLE_SPLIT));
  }
  return code;
}

static int32_t stbSplSplitJoinNodeImpl(SSplitContext* pCxt, SLogicSubplan* pSubplan, SJoinLogicNode* pJoin) {
  int32_t code = TSDB_CODE_SUCCESS;
  SNode*  pChild = NULL;
  FOREACH(pChild, pJoin->node.pChildren) {
    if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild)) {
      code = stbSplSplitScanNodeForJoin(pCxt, pSubplan, (SScanLogicNode*)pChild);
    } else if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pChild)) {
      code = stbSplSplitJoinNodeImpl(pCxt, pSubplan, (SJoinLogicNode*)pChild);
    } else {
      code = TSDB_CODE_PLAN_INTERNAL_ERROR;
    }
    if (TSDB_CODE_SUCCESS != code) {
      break;
    }
  }
  return code;
}

X
Xiaoyu Wang 已提交
655
static int32_t stbSplSplitJoinNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
X
Xiaoyu Wang 已提交
656 657 658 659 660 661
  int32_t code = stbSplSplitJoinNodeImpl(pCxt, pInfo->pSubplan, (SJoinLogicNode*)pInfo->pSplitNode);
  if (TSDB_CODE_SUCCESS == code) {
    pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
    SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT);
  }
  return code;
X
Xiaoyu Wang 已提交
662 663
}

X
Xiaoyu Wang 已提交
664
static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
665 666 667 668
  if (pCxt->pPlanCxt->rSmaQuery) {
    return TSDB_CODE_SUCCESS;
  }

X
Xiaoyu Wang 已提交
669 670
  SStableSplitInfo info = {0};
  if (!splMatch(pCxt, pSubplan, SPLIT_FLAG_STABLE_SPLIT, (FSplFindSplitNode)stbSplFindSplitNode, &info)) {
671 672
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
673 674 675

  int32_t code = TSDB_CODE_SUCCESS;
  switch (nodeType(info.pSplitNode)) {
X
Xiaoyu Wang 已提交
676 677 678 679 680 681
    case QUERY_NODE_LOGIC_PLAN_SCAN:
      code = stbSplSplitScanNode(pCxt, &info);
      break;
    case QUERY_NODE_LOGIC_PLAN_JOIN:
      code = stbSplSplitJoinNode(pCxt, &info);
      break;
X
Xiaoyu Wang 已提交
682 683 684
    case QUERY_NODE_LOGIC_PLAN_AGG:
      code = stbSplSplitAggNode(pCxt, &info);
      break;
X
Xiaoyu Wang 已提交
685 686 687
    case QUERY_NODE_LOGIC_PLAN_WINDOW:
      code = stbSplSplitWindowNode(pCxt, &info);
      break;
X
Xiaoyu Wang 已提交
688 689 690
    case QUERY_NODE_LOGIC_PLAN_SORT:
      code = stbSplSplitSortNode(pCxt, &info);
      break;
X
Xiaoyu Wang 已提交
691 692
    default:
      break;
693
  }
X
Xiaoyu Wang 已提交
694

695 696 697 698 699
  ++(pCxt->groupId);
  pCxt->split = true;
  return code;
}

X
Xiaoyu Wang 已提交
700 701 702 703 704 705 706
typedef struct SSigTbJoinSplitInfo {
  SJoinLogicNode* pJoin;
  SLogicNode*     pSplitNode;
  SLogicSubplan*  pSubplan;
} SSigTbJoinSplitInfo;

static bool sigTbJoinSplNeedSplit(SJoinLogicNode* pJoin) {
X
Xiaoyu Wang 已提交
707 708 709 710 711
  if (!pJoin->isSingleTableJoin) {
    return false;
  }
  return QUERY_NODE_LOGIC_PLAN_EXCHANGE != nodeType(nodesListGetNode(pJoin->node.pChildren, 0)) &&
         QUERY_NODE_LOGIC_PLAN_EXCHANGE != nodeType(nodesListGetNode(pJoin->node.pChildren, 1));
712 713
}

X
Xiaoyu Wang 已提交
714 715
static SJoinLogicNode* sigTbJoinSplMatchByNode(SLogicNode* pNode) {
  if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pNode) && sigTbJoinSplNeedSplit((SJoinLogicNode*)pNode)) {
X
Xiaoyu Wang 已提交
716
    return (SJoinLogicNode*)pNode;
717 718 719
  }
  SNode* pChild;
  FOREACH(pChild, pNode->pChildren) {
X
Xiaoyu Wang 已提交
720
    SJoinLogicNode* pSplitNode = sigTbJoinSplMatchByNode((SLogicNode*)pChild);
721 722 723 724 725 726 727
    if (NULL != pSplitNode) {
      return pSplitNode;
    }
  }
  return NULL;
}

728
static bool sigTbJoinSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SSigTbJoinSplitInfo* pInfo) {
X
Xiaoyu Wang 已提交
729
  SJoinLogicNode* pJoin = sigTbJoinSplMatchByNode(pSubplan->pNode);
X
Xiaoyu Wang 已提交
730 731
  if (NULL != pJoin) {
    pInfo->pJoin = pJoin;
732
    pInfo->pSplitNode = (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1);
733 734
    pInfo->pSubplan = pSubplan;
  }
X
Xiaoyu Wang 已提交
735
  return NULL != pJoin;
736 737
}

X
Xiaoyu Wang 已提交
738 739 740
static int32_t singleTableJoinSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
  SSigTbJoinSplitInfo info = {0};
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)sigTbJoinSplFindSplitNode, &info)) {
X
Xiaoyu Wang 已提交
741 742
    return TSDB_CODE_SUCCESS;
  }
743
  int32_t code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pSplitNode, info.pSubplan->subplanType);
X
Xiaoyu Wang 已提交
744
  if (TSDB_CODE_SUCCESS == code) {
745
    code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, info.pSplitNode, 0));
X
Xiaoyu Wang 已提交
746 747 748 749 750 751
  }
  ++(pCxt->groupId);
  pCxt->split = true;
  return code;
}

752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785
static bool unionIsChildSubplan(SLogicNode* pLogicNode, int32_t groupId) {
  if (QUERY_NODE_LOGIC_PLAN_EXCHANGE == nodeType(pLogicNode)) {
    return ((SExchangeLogicNode*)pLogicNode)->srcGroupId == groupId;
  }

  SNode* pChild;
  FOREACH(pChild, pLogicNode->pChildren) {
    bool isChild = unionIsChildSubplan((SLogicNode*)pChild, groupId);
    if (isChild) {
      return isChild;
    }
  }
  return false;
}

static int32_t unionMountSubplan(SLogicSubplan* pParent, SNodeList* pChildren) {
  SNode* pChild = NULL;
  WHERE_EACH(pChild, pChildren) {
    if (unionIsChildSubplan(pParent->pNode, ((SLogicSubplan*)pChild)->id.groupId)) {
      int32_t code = nodesListMakeAppend(&pParent->pChildren, pChild);
      if (TSDB_CODE_SUCCESS == code) {
        REPLACE_NODE(NULL);
        ERASE_NODE(pChildren);
        continue;
      } else {
        return code;
      }
    }
    WHERE_NEXT;
  }
  return TSDB_CODE_SUCCESS;
}

static SLogicSubplan* unionCreateSubplan(SSplitContext* pCxt, SLogicNode* pNode) {
786
  SLogicSubplan* pSubplan = (SLogicSubplan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
787 788 789
  if (NULL == pSubplan) {
    return NULL;
  }
790
  pSubplan->id.queryId = pCxt->queryId;
791 792 793
  pSubplan->id.groupId = pCxt->groupId;
  pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
  pSubplan->pNode = pNode;
794
  pNode->pParent = NULL;
795 796 797 798 799 800 801 802 803 804 805 806
  return pSubplan;
}

static int32_t unionSplitSubplan(SSplitContext* pCxt, SLogicSubplan* pUnionSubplan, SLogicNode* pSplitNode) {
  SNodeList* pSubplanChildren = pUnionSubplan->pChildren;
  pUnionSubplan->pChildren = NULL;

  int32_t code = TSDB_CODE_SUCCESS;

  SNode* pChild = NULL;
  FOREACH(pChild, pSplitNode->pChildren) {
    SLogicSubplan* pNewSubplan = unionCreateSubplan(pCxt, (SLogicNode*)pChild);
807
    code = nodesListMakeStrictAppend(&pUnionSubplan->pChildren, (SNode*)pNewSubplan);
808 809 810 811 812 813 814 815 816 817 818 819 820 821 822
    if (TSDB_CODE_SUCCESS == code) {
      REPLACE_NODE(NULL);
      code = unionMountSubplan(pNewSubplan, pSubplanChildren);
    }
    if (TSDB_CODE_SUCCESS != code) {
      break;
    }
  }
  if (TSDB_CODE_SUCCESS == code) {
    nodesDestroyList(pSubplanChildren);
    DESTORY_LIST(pSplitNode->pChildren);
  }
  return code;
}

X
Xiaoyu Wang 已提交
823 824 825 826 827
typedef struct SUnionAllSplitInfo {
  SProjectLogicNode* pProject;
  SLogicSubplan*     pSubplan;
} SUnionAllSplitInfo;

X
Xiaoyu Wang 已提交
828
static SLogicNode* unAllSplMatchByNode(SLogicNode* pNode) {
X
Xiaoyu Wang 已提交
829 830 831 832 833
  if (QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
    return pNode;
  }
  SNode* pChild;
  FOREACH(pChild, pNode->pChildren) {
X
Xiaoyu Wang 已提交
834
    SLogicNode* pSplitNode = unAllSplMatchByNode((SLogicNode*)pChild);
X
Xiaoyu Wang 已提交
835 836 837 838 839 840 841
    if (NULL != pSplitNode) {
      return pSplitNode;
    }
  }
  return NULL;
}

842
static bool unAllSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SUnionAllSplitInfo* pInfo) {
X
Xiaoyu Wang 已提交
843
  SLogicNode* pSplitNode = unAllSplMatchByNode(pSubplan->pNode);
X
Xiaoyu Wang 已提交
844 845 846 847 848 849 850
  if (NULL != pSplitNode) {
    pInfo->pProject = (SProjectLogicNode*)pSplitNode;
    pInfo->pSubplan = pSubplan;
  }
  return NULL != pSplitNode;
}

X
Xiaoyu Wang 已提交
851
static int32_t unAllSplCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SProjectLogicNode* pProject) {
852
  SExchangeLogicNode* pExchange = (SExchangeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE);
X
Xiaoyu Wang 已提交
853 854 855 856
  if (NULL == pExchange) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  pExchange->srcGroupId = pCxt->groupId;
X
Xiaoyu Wang 已提交
857
  pExchange->node.precision = pProject->node.precision;
X
Xiaoyu Wang 已提交
858 859 860 861 862 863 864
  pExchange->node.pTargets = nodesCloneList(pProject->node.pTargets);
  if (NULL == pExchange->node.pTargets) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  pSubplan->subplanType = SUBPLAN_TYPE_MERGE;

X
Xiaoyu Wang 已提交
865 866
  if (NULL == pProject->node.pParent) {
    pSubplan->pNode = (SLogicNode*)pExchange;
867
    nodesDestroyNode((SNode*)pProject);
X
Xiaoyu Wang 已提交
868 869 870 871 872
    return TSDB_CODE_SUCCESS;
  }

  SNode* pNode;
  FOREACH(pNode, pProject->node.pParent->pChildren) {
873
    if (nodesEqualNode(pNode, (SNode*)pProject)) {
X
Xiaoyu Wang 已提交
874 875 876 877 878
      REPLACE_NODE(pExchange);
      nodesDestroyNode(pNode);
      return TSDB_CODE_SUCCESS;
    }
  }
879 880
  nodesDestroyNode((SNode*)pExchange);
  return TSDB_CODE_PLAN_INTERNAL_ERROR;
X
Xiaoyu Wang 已提交
881 882
}

X
Xiaoyu Wang 已提交
883 884
static int32_t unionAllSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
  SUnionAllSplitInfo info = {0};
X
Xiaoyu Wang 已提交
885
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)unAllSplFindSplitNode, &info)) {
886 887
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
888

889
  int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pProject);
X
Xiaoyu Wang 已提交
890
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
891
    code = unAllSplCreateExchangeNode(pCxt, info.pSubplan, info.pProject);
X
Xiaoyu Wang 已提交
892 893
  }
  ++(pCxt->groupId);
894
  pCxt->split = true;
X
Xiaoyu Wang 已提交
895 896 897
  return code;
}

X
Xiaoyu Wang 已提交
898 899 900 901 902
typedef struct SUnionDistinctSplitInfo {
  SAggLogicNode* pAgg;
  SLogicSubplan* pSubplan;
} SUnionDistinctSplitInfo;

X
Xiaoyu Wang 已提交
903
static SLogicNode* unDistSplMatchByNode(SLogicNode* pNode) {
X
Xiaoyu Wang 已提交
904 905 906 907 908
  if (QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
    return pNode;
  }
  SNode* pChild;
  FOREACH(pChild, pNode->pChildren) {
X
Xiaoyu Wang 已提交
909
    SLogicNode* pSplitNode = unDistSplMatchByNode((SLogicNode*)pChild);
X
Xiaoyu Wang 已提交
910 911 912 913 914 915 916
    if (NULL != pSplitNode) {
      return pSplitNode;
    }
  }
  return NULL;
}

X
Xiaoyu Wang 已提交
917
static int32_t unDistSplCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SAggLogicNode* pAgg) {
918
  SExchangeLogicNode* pExchange = (SExchangeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE);
X
Xiaoyu Wang 已提交
919 920 921 922
  if (NULL == pExchange) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  pExchange->srcGroupId = pCxt->groupId;
X
Xiaoyu Wang 已提交
923
  pExchange->node.precision = pAgg->node.precision;
X
Xiaoyu Wang 已提交
924
  pExchange->node.pTargets = nodesCloneList(pAgg->pGroupKeys);
X
Xiaoyu Wang 已提交
925 926 927 928 929 930
  if (NULL == pExchange->node.pTargets) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  pSubplan->subplanType = SUBPLAN_TYPE_MERGE;

931
  return nodesListMakeAppend(&pAgg->node.pChildren, (SNode*)pExchange);
X
Xiaoyu Wang 已提交
932 933
}

934
static bool unDistSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SUnionDistinctSplitInfo* pInfo) {
X
Xiaoyu Wang 已提交
935
  SLogicNode* pSplitNode = unDistSplMatchByNode(pSubplan->pNode);
X
Xiaoyu Wang 已提交
936 937 938 939 940 941 942
  if (NULL != pSplitNode) {
    pInfo->pAgg = (SAggLogicNode*)pSplitNode;
    pInfo->pSubplan = pSubplan;
  }
  return NULL != pSplitNode;
}

X
Xiaoyu Wang 已提交
943 944
static int32_t unionDistinctSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
  SUnionDistinctSplitInfo info = {0};
X
Xiaoyu Wang 已提交
945
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)unDistSplFindSplitNode, &info)) {
X
Xiaoyu Wang 已提交
946 947 948
    return TSDB_CODE_SUCCESS;
  }

949
  int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pAgg);
X
Xiaoyu Wang 已提交
950
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
951
    code = unDistSplCreateExchangeNode(pCxt, info.pSubplan, info.pAgg);
X
Xiaoyu Wang 已提交
952 953 954 955 956 957
  }
  ++(pCxt->groupId);
  pCxt->split = true;
  return code;
}

X
Xiaoyu Wang 已提交
958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000
typedef struct SSmaIndexSplitInfo {
  SMergeLogicNode* pMerge;
  SLogicSubplan*   pSubplan;
} SSmaIndexSplitInfo;

static SLogicNode* smaIdxSplMatchByNode(SLogicNode* pNode) {
  if (QUERY_NODE_LOGIC_PLAN_MERGE == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
    return pNode;
  }
  SNode* pChild;
  FOREACH(pChild, pNode->pChildren) {
    SLogicNode* pSplitNode = smaIdxSplMatchByNode((SLogicNode*)pChild);
    if (NULL != pSplitNode) {
      return pSplitNode;
    }
  }
  return NULL;
}

static bool smaIdxSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SSmaIndexSplitInfo* pInfo) {
  SLogicNode* pSplitNode = smaIdxSplMatchByNode(pSubplan->pNode);
  if (NULL != pSplitNode) {
    pInfo->pMerge = (SMergeLogicNode*)pSplitNode;
    pInfo->pSubplan = pSubplan;
  }
  return NULL != pSplitNode;
}

static int32_t smaIndexSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
  SSmaIndexSplitInfo info = {0};
  if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)smaIdxSplFindSplitNode, &info)) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pMerge);
  if (TSDB_CODE_SUCCESS == code) {
    info.pMerge->srcGroupId = pCxt->groupId;
  }
  ++(pCxt->groupId);
  pCxt->split = true;
  return code;
}

X
Xiaoyu Wang 已提交
1001 1002 1003 1004 1005
// clang-format off
static const SSplitRule splitRuleSet[] = {
  {.pName = "SuperTableSplit",      .splitFunc = stableSplit},
  {.pName = "SingleTableJoinSplit", .splitFunc = singleTableJoinSplit},
  {.pName = "UnionAllSplit",        .splitFunc = unionAllSplit},
X
Xiaoyu Wang 已提交
1006 1007
  {.pName = "UnionDistinctSplit",   .splitFunc = unionDistinctSplit},
  {.pName = "SmaIndexSplit",        .splitFunc = smaIndexSplit}
X
Xiaoyu Wang 已提交
1008 1009
};
// clang-format on
X
Xiaoyu Wang 已提交
1010 1011 1012

static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule));

1013 1014
static void dumpLogicSubplan(const char* pRuleName, SLogicSubplan* pSubplan) {
  char* pStr = NULL;
1015
  nodesNodeToString((SNode*)pSubplan, false, &pStr, NULL);
1016 1017 1018 1019
  qDebugL("apply %s rule: %s", pRuleName, pStr);
  taosMemoryFree(pStr);
}

1020 1021 1022 1023
static int32_t applySplitRule(SPlanContext* pCxt, SLogicSubplan* pSubplan) {
  SSplitContext cxt = {
      .pPlanCxt = pCxt, .queryId = pSubplan->id.queryId, .groupId = pSubplan->id.groupId + 1, .split = false};
  bool split = false;
X
Xiaoyu Wang 已提交
1024
  do {
1025
    split = false;
X
Xiaoyu Wang 已提交
1026
    for (int32_t i = 0; i < splitRuleNum; ++i) {
1027
      cxt.split = false;
1028
      int32_t code = splitRuleSet[i].splitFunc(&cxt, pSubplan);
X
Xiaoyu Wang 已提交
1029 1030 1031
      if (TSDB_CODE_SUCCESS != code) {
        return code;
      }
1032 1033 1034 1035
      if (cxt.split) {
        split = true;
        dumpLogicSubplan(splitRuleSet[i].pName, pSubplan);
      }
X
Xiaoyu Wang 已提交
1036
    }
1037
  } while (split);
X
Xiaoyu Wang 已提交
1038 1039
  return TSDB_CODE_SUCCESS;
}
X
Xiaoyu Wang 已提交
1040

X
Xiaoyu Wang 已提交
1041 1042 1043 1044 1045 1046 1047 1048 1049 1050
static void setVgroupsInfo(SLogicNode* pNode, SLogicSubplan* pSubplan) {
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
    TSWAP(((SScanLogicNode*)pNode)->pVgroupList, pSubplan->pVgroupList);
    return;
  }

  SNode* pChild;
  FOREACH(pChild, pNode->pChildren) { setVgroupsInfo((SLogicNode*)pChild, pSubplan); }
}

X
Xiaoyu Wang 已提交
1051 1052 1053 1054
int32_t splitLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) {
  if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY == nodeType(pLogicSubplan->pNode)) {
    setVgroupsInfo(pLogicSubplan->pNode, pLogicSubplan);
    return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
1055
  }
X
Xiaoyu Wang 已提交
1056 1057
  return applySplitRule(pCxt, pLogicSubplan);
}