planScaleOut.c 9.1 KB
Newer Older
X
Xiaoyu Wang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "planInt.h"

typedef struct SScaleOutContext {
  SPlanContext* pPlanCxt;
X
Xiaoyu Wang 已提交
20
  int32_t       subplanId;
X
Xiaoyu Wang 已提交
21 22 23
} SScaleOutContext;

static SLogicSubplan* singleCloneSubLogicPlan(SScaleOutContext* pCxt, SLogicSubplan* pSrc, int32_t level) {
24
  SLogicSubplan* pDst = (SLogicSubplan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
X
Xiaoyu Wang 已提交
25
  if (NULL == pDst) {
X
Xiaoyu Wang 已提交
26 27
    return NULL;
  }
28
  pDst->pNode = (SLogicNode*)nodesCloneNode((SNode*)pSrc->pNode);
X
Xiaoyu Wang 已提交
29
  if (NULL == pDst->pNode) {
30
    nodesDestroyNode((SNode*)pDst);
X
Xiaoyu Wang 已提交
31 32 33 34 35 36 37 38 39 40 41 42 43
    return NULL;
  }
  pDst->subplanType = pSrc->subplanType;
  pDst->level = level;
  pDst->id.queryId = pSrc->id.queryId;
  pDst->id.groupId = pSrc->id.groupId;
  pDst->id.subplanId = pCxt->subplanId++;
  return pDst;
}

static int32_t doSetScanVgroup(SLogicNode* pNode, const SVgroupInfo* pVgroup, bool* pFound) {
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
    SScanLogicNode* pScan = (SScanLogicNode*)pNode;
wafwerar's avatar
wafwerar 已提交
44
    pScan->pVgroupList = taosMemoryCalloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo));
X
Xiaoyu Wang 已提交
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
    if (NULL == pScan->pVgroupList) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    memcpy(pScan->pVgroupList->vgroups, pVgroup, sizeof(SVgroupInfo));
    *pFound = true;
    return TSDB_CODE_SUCCESS;
  }
  SNode* pChild = NULL;
  FOREACH(pChild, pNode->pChildren) {
    int32_t code = doSetScanVgroup((SLogicNode*)pChild, pVgroup, pFound);
    if (TSDB_CODE_SUCCESS != code || *pFound) {
      return code;
    }
  }
  return TSDB_CODE_SUCCESS;
}

static int32_t setScanVgroup(SLogicNode* pNode, const SVgroupInfo* pVgroup) {
  bool found = false;
  return doSetScanVgroup(pNode, pVgroup, &found);
}

X
Xiaoyu Wang 已提交
67 68 69 70 71 72 73 74 75
static int32_t scaleOutByVgroups(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
  int32_t code = TSDB_CODE_SUCCESS;
  for (int32_t i = 0; i < pSubplan->pVgroupList->numOfVgroups; ++i) {
    SLogicSubplan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level);
    if (NULL == pNewSubplan) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    code = setScanVgroup(pNewSubplan->pNode, pSubplan->pVgroupList->vgroups + i);
    if (TSDB_CODE_SUCCESS == code) {
76
      code = nodesListStrictAppend(pGroup, (SNode*)pNewSubplan);
X
Xiaoyu Wang 已提交
77 78 79 80 81 82 83 84
    }
    if (TSDB_CODE_SUCCESS != code) {
      break;
    }
  }
  return code;
}

85 86 87 88 89 90
static int32_t scaleOutForMerge(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
  return nodesListStrictAppend(pGroup, (SNode*)singleCloneSubLogicPlan(pCxt, pSubplan, level));
}

static int32_t scaleOutForInsertValues(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level,
                                       SNodeList* pGroup) {
X
Xiaoyu Wang 已提交
91
  SVnodeModifyLogicNode* pNode = (SVnodeModifyLogicNode*)pSubplan->pNode;
92 93 94 95 96 97 98 99 100
  size_t                 numOfVgroups = taosArrayGetSize(pNode->pDataBlocks);
  for (int32_t i = 0; i < numOfVgroups; ++i) {
    SLogicSubplan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level);
    if (NULL == pNewSubplan) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    ((SVnodeModifyLogicNode*)pNewSubplan->pNode)->pVgDataBlocks = (SVgDataBlocks*)taosArrayGetP(pNode->pDataBlocks, i);
    if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pGroup, (SNode*)pNewSubplan)) {
      return TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
101
    }
X
Xiaoyu Wang 已提交
102
  }
103
  return TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
104 105
}

106 107 108 109 110 111 112 113 114 115 116 117 118 119
static int32_t scaleOutForInsert(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
  SVnodeModifyLogicNode* pNode = (SVnodeModifyLogicNode*)pSubplan->pNode;
  if (NULL == pNode->node.pChildren) {
    return scaleOutForInsertValues(pCxt, pSubplan, level, pGroup);
  }
  return scaleOutForMerge(pCxt, pSubplan, level, pGroup);
}

static int32_t scaleOutForModify(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
  SVnodeModifyLogicNode* pNode = (SVnodeModifyLogicNode*)pSubplan->pNode;
  if (MODIFY_TABLE_TYPE_DELETE == pNode->modifyType) {
    return scaleOutByVgroups(pCxt, pSubplan, level, pGroup);
  }
  return scaleOutForInsert(pCxt, pSubplan, level, pGroup);
X
Xiaoyu Wang 已提交
120 121 122 123 124
}

static int32_t scaleOutForScan(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
  if (pSubplan->pVgroupList && !pCxt->pPlanCxt->streamQuery) {
    return scaleOutByVgroups(pCxt, pSubplan, level, pGroup);
X
Xiaoyu Wang 已提交
125 126 127 128 129
  } else {
    return scaleOutForMerge(pCxt, pSubplan, level, pGroup);
  }
}

130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
static int32_t scaleOutForCompute(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
  int32_t code = TSDB_CODE_SUCCESS;
  for (int32_t i = 0; i < pSubplan->numOfComputeNodes; ++i) {
    SLogicSubplan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level);
    if (NULL == pNewSubplan) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    code = nodesListStrictAppend(pGroup, (SNode*)pNewSubplan);
    if (TSDB_CODE_SUCCESS != code) {
      break;
    }
  }
  return code;
}

static int32_t pushHierarchicalPlanForCompute(SNodeList* pParentsGroup, SNodeList* pCurrentGroup) {
  SNode*  pChild = NULL;
  SNode*  pParent = NULL;
  int32_t code = TSDB_CODE_SUCCESS;
  FORBOTH(pChild, pCurrentGroup, pParent, pParentsGroup) {
    code = nodesListMakeAppend(&(((SLogicSubplan*)pParent)->pChildren), pChild);
    if (TSDB_CODE_SUCCESS == code) {
      code = nodesListMakeAppend(&(((SLogicSubplan*)pChild)->pParents), pParent);
    }
    if (TSDB_CODE_SUCCESS != code) {
      break;
    }
  }
  return code;
}

static bool isComputeGroup(SNodeList* pGroup) {
  if (0 == LIST_LENGTH(pGroup)) {
    return false;
  }
  return SUBPLAN_TYPE_COMPUTE == ((SLogicSubplan*)nodesListGetNode(pGroup, 0))->subplanType;
}

static int32_t pushHierarchicalPlanForNormal(SNodeList* pParentsGroup, SNodeList* pCurrentGroup) {
X
Xiaoyu Wang 已提交
169
  int32_t code = TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
170 171
  bool    topLevel = (0 == LIST_LENGTH(pParentsGroup));
  SNode*  pChild = NULL;
X
Xiaoyu Wang 已提交
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
  FOREACH(pChild, pCurrentGroup) {
    if (topLevel) {
      code = nodesListAppend(pParentsGroup, pChild);
    } else {
      SNode* pParent = NULL;
      FOREACH(pParent, pParentsGroup) {
        code = nodesListMakeAppend(&(((SLogicSubplan*)pParent)->pChildren), pChild);
        if (TSDB_CODE_SUCCESS == code) {
          code = nodesListMakeAppend(&(((SLogicSubplan*)pChild)->pParents), pParent);
        }
      }
    }
    if (TSDB_CODE_SUCCESS != code) {
      break;
    }
  }
  return code;
}

191 192 193 194 195 196 197
static int32_t pushHierarchicalPlan(SNodeList* pParentsGroup, SNodeList* pCurrentGroup) {
  if (isComputeGroup(pParentsGroup)) {
    return pushHierarchicalPlanForCompute(pParentsGroup, pCurrentGroup);
  }
  return pushHierarchicalPlanForNormal(pParentsGroup, pCurrentGroup);
}

X
Xiaoyu Wang 已提交
198
static int32_t doScaleOut(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pParentsGroup) {
X
Xiaoyu Wang 已提交
199 200 201 202 203 204 205 206
  SNodeList* pCurrentGroup = nodesMakeList();
  if (NULL == pCurrentGroup) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  int32_t code = TSDB_CODE_SUCCESS;
  switch (pSubplan->subplanType) {
    case SUBPLAN_TYPE_MERGE:
X
Xiaoyu Wang 已提交
207
      code = scaleOutForMerge(pCxt, pSubplan, level, pCurrentGroup);
X
Xiaoyu Wang 已提交
208 209
      break;
    case SUBPLAN_TYPE_SCAN:
X
Xiaoyu Wang 已提交
210
      code = scaleOutForScan(pCxt, pSubplan, level, pCurrentGroup);
X
Xiaoyu Wang 已提交
211 212
      break;
    case SUBPLAN_TYPE_MODIFY:
X
Xiaoyu Wang 已提交
213
      code = scaleOutForModify(pCxt, pSubplan, level, pCurrentGroup);
X
Xiaoyu Wang 已提交
214
      break;
215 216 217
    case SUBPLAN_TYPE_COMPUTE:
      code = scaleOutForCompute(pCxt, pSubplan, level, pCurrentGroup);
      break;
X
Xiaoyu Wang 已提交
218 219 220 221 222 223 224 225 226 227 228
    default:
      break;
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = pushHierarchicalPlan(pParentsGroup, pCurrentGroup);
  }

  if (TSDB_CODE_SUCCESS == code) {
    SNode* pChild;
    FOREACH(pChild, pSubplan->pChildren) {
X
Xiaoyu Wang 已提交
229
      code = doScaleOut(pCxt, (SLogicSubplan*)pChild, level + 1, pCurrentGroup);
X
Xiaoyu Wang 已提交
230 231 232 233 234 235 236 237
      if (TSDB_CODE_SUCCESS != code) {
        break;
      }
    }
  }

  if (TSDB_CODE_SUCCESS != code) {
    nodesDestroyList(pCurrentGroup);
X
Xiaoyu Wang 已提交
238 239
  } else {
    nodesClearList(pCurrentGroup);
X
Xiaoyu Wang 已提交
240 241 242 243 244 245 246 247 248 249 250 251
  }

  return code;
}

static SQueryLogicPlan* makeQueryLogicPlan() {
  SQueryLogicPlan* pLogicPlan = (SQueryLogicPlan*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN);
  if (NULL == pLogicPlan) {
    return NULL;
  }
  pLogicPlan->pTopSubplans = nodesMakeList();
  if (NULL == pLogicPlan->pTopSubplans) {
252
    nodesDestroyNode((SNode*)pLogicPlan);
X
Xiaoyu Wang 已提交
253 254 255 256 257 258 259 260 261 262 263
    return NULL;
  }
  return pLogicPlan;
}

int32_t scaleOutLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SQueryLogicPlan** pLogicPlan) {
  SQueryLogicPlan* pPlan = makeQueryLogicPlan();
  if (NULL == pPlan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

X
Xiaoyu Wang 已提交
264 265
  SScaleOutContext cxt = {.pPlanCxt = pCxt, .subplanId = 1};
  int32_t          code = doScaleOut(&cxt, pLogicSubplan, 0, pPlan->pTopSubplans);
X
Xiaoyu Wang 已提交
266 267 268
  if (TSDB_CODE_SUCCESS == code) {
    *pLogicPlan = pPlan;
  } else {
269
    nodesDestroyNode((SNode*)pPlan);
X
Xiaoyu Wang 已提交
270 271 272 273
  }

  return code;
}