planner.c 5.7 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14 15
 */

X
Xiaoyu Wang 已提交
16
#include "planner.h"
17

X
Xiaoyu Wang 已提交
18
#include "planInt.h"
X
Xiaoyu Wang 已提交
19
#include "scalar.h"
20
#include "tglobal.h"
X
Xiaoyu Wang 已提交
21

22 23 24 25 26 27 28 29
static void debugPrintNode(SNode* pNode) {
  char* pStr = NULL;
  nodesNodeToString(pNode, false, &pStr, NULL);
  printf("%s\n", pStr);
  taosMemoryFree(pStr);
  return;
}

X
Xiaoyu Wang 已提交
30
static void dumpQueryPlan(SQueryPlan* pPlan) {
31
  if (!tsQueryPlannerTrace) {
32 33
    return;
  }
X
Xiaoyu Wang 已提交
34
  char* pStr = NULL;
35
  nodesNodeToString((SNode*)pPlan, false, &pStr, NULL);
36
  planDebugL("QID:0x%" PRIx64 " Query Plan: %s", pPlan->queryId, pStr);
X
Xiaoyu Wang 已提交
37 38 39
  taosMemoryFree(pStr);
}

40
int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNodeList) {
X
Xiaoyu Wang 已提交
41
  SLogicSubplan*   pLogicSubplan = NULL;
X
Xiaoyu Wang 已提交
42 43
  SQueryLogicPlan* pLogicPlan = NULL;

44 45 46 47
  int32_t code = nodesAcquireAllocator(pCxt->allocatorId);
  if (TSDB_CODE_SUCCESS == code) {
    code = createLogicPlan(pCxt, &pLogicSubplan);
  }
X
Xiaoyu Wang 已提交
48
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
49
    code = optimizeLogicPlan(pCxt, pLogicSubplan);
X
Xiaoyu Wang 已提交
50
  }
X
Xiaoyu Wang 已提交
51
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
52
    code = splitLogicPlan(pCxt, pLogicSubplan);
X
Xiaoyu Wang 已提交
53 54
  }
  if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
55
    code = scaleOutLogicPlan(pCxt, pLogicSubplan, &pLogicPlan);
X
Xiaoyu Wang 已提交
56
  }
X
Xiaoyu Wang 已提交
57 58 59
  if (TSDB_CODE_SUCCESS == code) {
    code = createPhysiPlan(pCxt, pLogicPlan, pPlan, pExecNodeList);
  }
60 61 62
  if (TSDB_CODE_SUCCESS == code) {
    dumpQueryPlan(*pPlan);
  }
63
  nodesReleaseAllocator(pCxt->allocatorId);
X
Xiaoyu Wang 已提交
64

65 66
  nodesDestroyNode((SNode*)pLogicSubplan);
  nodesDestroyNode((SNode*)pLogicPlan);
X
Xiaoyu Wang 已提交
67
  terrno = code;
X
Xiaoyu Wang 已提交
68
  return code;
69 70
}

X
Xiaoyu Wang 已提交
71 72 73
static int32_t setSubplanExecutionNode(SPhysiNode* pNode, int32_t groupId, SDownstreamSourceNode* pSource) {
  if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pNode)) {
    SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pNode;
74
    if (groupId >= pExchange->srcStartGroupId && groupId <= pExchange->srcEndGroupId) {
75
      return nodesListMakeStrictAppend(&pExchange->pSrcEndPoints, nodesCloneNode((SNode*)pSource));
X
Xiaoyu Wang 已提交
76 77 78 79 80 81 82 83 84 85
    }
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == nodeType(pNode)) {
    SMergePhysiNode* pMerge = (SMergePhysiNode*)pNode;
    if (pMerge->srcGroupId == groupId) {
      SExchangePhysiNode* pExchange =
          (SExchangePhysiNode*)nodesListGetNode(pMerge->node.pChildren, pMerge->numOfChannels - 1);
      if (1 == pMerge->numOfChannels) {
        pMerge->numOfChannels = LIST_LENGTH(pMerge->node.pChildren);
      } else {
        --(pMerge->numOfChannels);
X
Xiaoyu Wang 已提交
86
      }
87
      return nodesListMakeStrictAppend(&pExchange->pSrcEndPoints, nodesCloneNode((SNode*)pSource));
X
Xiaoyu Wang 已提交
88 89 90 91 92 93 94 95 96 97 98
    }
  }

  SNode* pChild = NULL;
  FOREACH(pChild, pNode->pChildren) {
    if (TSDB_CODE_SUCCESS != setSubplanExecutionNode((SPhysiNode*)pChild, groupId, pSource)) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
  }
  return TSDB_CODE_SUCCESS;
}
H
Haojun Liao 已提交
99

X
Xiaoyu Wang 已提交
100
int32_t qSetSubplanExecutionNode(SSubplan* subplan, int32_t groupId, SDownstreamSourceNode* pSource) {
X
Xiaoyu Wang 已提交
101
  planDebug("QID:0x%" PRIx64 " set subplan execution node, groupId:%d", subplan->id.queryId, groupId);
X
Xiaoyu Wang 已提交
102
  return setSubplanExecutionNode(subplan->pNode, groupId, pSource);
X
Xiaoyu Wang 已提交
103
}
H
Haojun Liao 已提交
104

105 106 107 108 109 110 111 112 113 114 115 116 117
static void clearSubplanExecutionNode(SPhysiNode* pNode) {
  if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pNode)) {
    SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pNode;
    NODES_DESTORY_LIST(pExchange->pSrcEndPoints);
  } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == nodeType(pNode)) {
    SMergePhysiNode* pMerge = (SMergePhysiNode*)pNode;
    pMerge->numOfChannels = LIST_LENGTH(pMerge->node.pChildren);
    SNode* pChild = NULL;
    FOREACH(pChild, pMerge->node.pChildren) { NODES_DESTORY_LIST(((SExchangePhysiNode*)pChild)->pSrcEndPoints); }
  }

  SNode* pChild = NULL;
  FOREACH(pChild, pNode->pChildren) { clearSubplanExecutionNode((SPhysiNode*)pChild); }
118 119
}

X
Xiaoyu Wang 已提交
120 121 122 123
void qClearSubplanExecutionNode(SSubplan* pSubplan) {
  planDebug("QID:0x%" PRIx64 " clear subplan execution node, groupId:%d", pSubplan->id.queryId, pSubplan->id.groupId);
  clearSubplanExecutionNode(pSubplan->pNode);
}
124

X
Xiaoyu Wang 已提交
125
int32_t qSubPlanToString(const SSubplan* pSubplan, char** pStr, int32_t* pLen) {
D
dapan1121 已提交
126
  if (SUBPLAN_TYPE_MODIFY == pSubplan->subplanType && NULL == pSubplan->pNode) {
X
Xiaoyu Wang 已提交
127 128 129
    SDataInserterNode* insert = (SDataInserterNode*)pSubplan->pDataSink;
    *pLen = insert->size;
    *pStr = insert->pData;
130 131 132
    insert->pData = NULL;
    return TSDB_CODE_SUCCESS;
  }
X
Xiaoyu Wang 已提交
133
  return nodesNodeToString((const SNode*)pSubplan, false, pStr, pLen);
X
Xiaoyu Wang 已提交
134
}
H
Haojun Liao 已提交
135

X
Xiaoyu Wang 已提交
136
int32_t qStringToSubplan(const char* pStr, SSubplan** pSubplan) { return nodesStringToNode(pStr, (SNode**)pSubplan); }
X
Xiaoyu Wang 已提交
137

138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
int32_t qSubPlanToMsg(const SSubplan* pSubplan, char** pStr, int32_t* pLen) {
  if (SUBPLAN_TYPE_MODIFY == pSubplan->subplanType && NULL == pSubplan->pNode) {
    SDataInserterNode* insert = (SDataInserterNode*)pSubplan->pDataSink;
    *pLen = insert->size;
    *pStr = insert->pData;
    insert->pData = NULL;
    return TSDB_CODE_SUCCESS;
  }
  return nodesNodeToMsg((const SNode*)pSubplan, pStr, pLen);
}

int32_t qMsgToSubplan(const char* pStr, int32_t len, SSubplan** pSubplan) {
  return nodesMsgToNode(pStr, len, (SNode**)pSubplan);
}

X
Xiaoyu Wang 已提交
153
SQueryPlan* qStringToQueryPlan(const char* pStr) {
X
Xiaoyu Wang 已提交
154 155 156 157 158
  SQueryPlan* pPlan = NULL;
  if (TSDB_CODE_SUCCESS != nodesStringToNode(pStr, (SNode**)&pPlan)) {
    return NULL;
  }
  return pPlan;
X
Xiaoyu Wang 已提交
159 160
}

161
void qDestroyQueryPlan(SQueryPlan* pPlan) { nodesDestroyNode((SNode*)pPlan); }