planScaleOut.c 6.5 KB
Newer Older
X
Xiaoyu Wang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "planInt.h"

typedef struct SScaleOutContext {
  SPlanContext* pPlanCxt;
X
Xiaoyu Wang 已提交
20
  int32_t       subplanId;
X
Xiaoyu Wang 已提交
21 22 23 24
} SScaleOutContext;

static SLogicSubplan* singleCloneSubLogicPlan(SScaleOutContext* pCxt, SLogicSubplan* pSrc, int32_t level) {
  SLogicSubplan* pDst = nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
X
Xiaoyu Wang 已提交
25
  if (NULL == pDst) {
X
Xiaoyu Wang 已提交
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
    return NULL;
  }
  pDst->pNode = nodesCloneNode(pSrc->pNode);
  if (NULL == pDst->pNode) {
    nodesDestroyNode(pDst);
    return NULL;
  }
  pDst->subplanType = pSrc->subplanType;
  pDst->level = level;
  pDst->id.queryId = pSrc->id.queryId;
  pDst->id.groupId = pSrc->id.groupId;
  pDst->id.subplanId = pCxt->subplanId++;
  return pDst;
}

static int32_t scaleOutForModify(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
  SVnodeModifLogicNode* pNode = (SVnodeModifLogicNode*)pSubplan->pNode;
X
Xiaoyu Wang 已提交
43
  size_t                numOfVgroups = taosArrayGetSize(pNode->pDataBlocks);
X
Xiaoyu Wang 已提交
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
  for (int32_t i = 0; i < numOfVgroups; ++i) {
    SLogicSubplan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level);
    if (NULL == pNewSubplan) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    ((SVnodeModifLogicNode*)pNewSubplan->pNode)->pVgDataBlocks = (SVgDataBlocks*)taosArrayGetP(pNode->pDataBlocks, i);
    if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pGroup, pNewSubplan)) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
  }
  return TSDB_CODE_SUCCESS;
}

static int32_t scaleOutForMerge(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
  return nodesListStrictAppend(pGroup, singleCloneSubLogicPlan(pCxt, pSubplan, level));
}

static int32_t doSetScanVgroup(SLogicNode* pNode, const SVgroupInfo* pVgroup, bool* pFound) {
  if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
    SScanLogicNode* pScan = (SScanLogicNode*)pNode;
wafwerar's avatar
wafwerar 已提交
64
    pScan->pVgroupList = taosMemoryCalloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo));
X
Xiaoyu Wang 已提交
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
    if (NULL == pScan->pVgroupList) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    memcpy(pScan->pVgroupList->vgroups, pVgroup, sizeof(SVgroupInfo));
    *pFound = true;
    return TSDB_CODE_SUCCESS;
  }
  SNode* pChild = NULL;
  FOREACH(pChild, pNode->pChildren) {
    int32_t code = doSetScanVgroup((SLogicNode*)pChild, pVgroup, pFound);
    if (TSDB_CODE_SUCCESS != code || *pFound) {
      return code;
    }
  }
  return TSDB_CODE_SUCCESS;
}

static int32_t setScanVgroup(SLogicNode* pNode, const SVgroupInfo* pVgroup) {
  bool found = false;
  return doSetScanVgroup(pNode, pVgroup, &found);
}

static int32_t scaleOutForScan(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
X
bugfix  
Xiaoyu Wang 已提交
88
  if (pSubplan->pVgroupList && !pCxt->pPlanCxt->streamQuery) {
X
Xiaoyu Wang 已提交
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
    int32_t code = TSDB_CODE_SUCCESS;
    for (int32_t i = 0; i < pSubplan->pVgroupList->numOfVgroups; ++i) {
      SLogicSubplan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level);
      if (NULL == pNewSubplan) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      code = setScanVgroup(pNewSubplan->pNode, pSubplan->pVgroupList->vgroups + i);
      if (TSDB_CODE_SUCCESS == code) {
        code = nodesListStrictAppend(pGroup, pNewSubplan);
      }
      if (TSDB_CODE_SUCCESS != code) {
        break;
      }
    }
    return code;
  } else {
    return scaleOutForMerge(pCxt, pSubplan, level, pGroup);
  }
}

static int32_t pushHierarchicalPlan(SNodeList* pParentsGroup, SNodeList* pCurrentGroup) {
  int32_t code = TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
111 112
  bool    topLevel = (0 == LIST_LENGTH(pParentsGroup));
  SNode*  pChild = NULL;
X
Xiaoyu Wang 已提交
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
  FOREACH(pChild, pCurrentGroup) {
    if (topLevel) {
      code = nodesListAppend(pParentsGroup, pChild);
    } else {
      SNode* pParent = NULL;
      FOREACH(pParent, pParentsGroup) {
        code = nodesListMakeAppend(&(((SLogicSubplan*)pParent)->pChildren), pChild);
        if (TSDB_CODE_SUCCESS == code) {
          code = nodesListMakeAppend(&(((SLogicSubplan*)pChild)->pParents), pParent);
        }
      }
    }
    if (TSDB_CODE_SUCCESS != code) {
      break;
    }
  }
  return code;
}

X
Xiaoyu Wang 已提交
132
static int32_t doScaleOut(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pParentsGroup) {
X
Xiaoyu Wang 已提交
133 134 135 136 137 138 139 140
  SNodeList* pCurrentGroup = nodesMakeList();
  if (NULL == pCurrentGroup) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  int32_t code = TSDB_CODE_SUCCESS;
  switch (pSubplan->subplanType) {
    case SUBPLAN_TYPE_MERGE:
X
Xiaoyu Wang 已提交
141
      code = scaleOutForMerge(pCxt, pSubplan, level, pCurrentGroup);
X
Xiaoyu Wang 已提交
142 143
      break;
    case SUBPLAN_TYPE_SCAN:
X
Xiaoyu Wang 已提交
144
      code = scaleOutForScan(pCxt, pSubplan, level, pCurrentGroup);
X
Xiaoyu Wang 已提交
145 146
      break;
    case SUBPLAN_TYPE_MODIFY:
X
Xiaoyu Wang 已提交
147
      code = scaleOutForModify(pCxt, pSubplan, level, pCurrentGroup);
X
Xiaoyu Wang 已提交
148 149 150 151 152 153 154 155 156 157 158 159
      break;
    default:
      break;
  }

  if (TSDB_CODE_SUCCESS == code) {
    code = pushHierarchicalPlan(pParentsGroup, pCurrentGroup);
  }

  if (TSDB_CODE_SUCCESS == code) {
    SNode* pChild;
    FOREACH(pChild, pSubplan->pChildren) {
X
Xiaoyu Wang 已提交
160
      code = doScaleOut(pCxt, (SLogicSubplan*)pChild, level + 1, pCurrentGroup);
X
Xiaoyu Wang 已提交
161 162 163 164 165 166 167 168
      if (TSDB_CODE_SUCCESS != code) {
        break;
      }
    }
  }

  if (TSDB_CODE_SUCCESS != code) {
    nodesDestroyList(pCurrentGroup);
X
Xiaoyu Wang 已提交
169 170
  } else {
    nodesClearList(pCurrentGroup);
X
Xiaoyu Wang 已提交
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
  }

  return code;
}

static SQueryLogicPlan* makeQueryLogicPlan() {
  SQueryLogicPlan* pLogicPlan = (SQueryLogicPlan*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN);
  if (NULL == pLogicPlan) {
    return NULL;
  }
  pLogicPlan->pTopSubplans = nodesMakeList();
  if (NULL == pLogicPlan->pTopSubplans) {
    nodesDestroyNode(pLogicPlan);
    return NULL;
  }
  return pLogicPlan;
}

int32_t scaleOutLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SQueryLogicPlan** pLogicPlan) {
  SQueryLogicPlan* pPlan = makeQueryLogicPlan();
  if (NULL == pPlan) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }

X
Xiaoyu Wang 已提交
195 196
  SScaleOutContext cxt = {.pPlanCxt = pCxt, .subplanId = 1};
  int32_t          code = doScaleOut(&cxt, pLogicSubplan, 0, pPlan->pTopSubplans);
X
Xiaoyu Wang 已提交
197 198 199 200 201 202 203 204
  if (TSDB_CODE_SUCCESS == code) {
    *pLogicPlan = pPlan;
  } else {
    nodesDestroyNode(pPlan);
  }

  return code;
}