physicalPlan.c 15.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

X
Xiaoyu Wang 已提交
16 17
#if 0

18
#include "plannerInt.h"
X
Xiaoyu Wang 已提交
19
#include "exception.h"
20
#include "parser.h"
21

22 23 24
#define STORE_CURRENT_SUBPLAN(cxt) SSubplan* _ = cxt->pCurrentSubplan
#define RECOVERY_CURRENT_SUBPLAN(cxt) cxt->pCurrentSubplan = _

25 26 27 28 29 30 31
typedef struct SPlanContext {
  struct SCatalog* pCatalog;
  struct SQueryDag* pDag;
  SSubplan* pCurrentSubplan;
  SSubplanId nextId;
} SPlanContext;

32 33 34 35 36 37
static const char* gOpName[] = {
  "Unknown",
#define INCLUDE_AS_NAME
#include "plannerOp.h"
#undef INCLUDE_AS_NAME
};
X
Xiaoyu Wang 已提交
38

39
static void* validPointer(void* p) {
X
Xiaoyu Wang 已提交
40 41 42 43 44 45
  if (NULL == p) {
    THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
  }
  return p;
}

46 47 48 49
const char* opTypeToOpName(int32_t type) {
  return gOpName[type];
}

50 51
int32_t opNameToOpType(const char* name) {
  for (int32_t i = 1; i < sizeof(gOpName) / sizeof(gOpName[0]); ++i) {
52
    if (0 == strcmp(name, gOpName[i])) {
53 54 55 56 57
      return i;
    }
  }
  return OP_Unknown;
}
X
Xiaoyu Wang 已提交
58

59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
const char* dsinkTypeToDsinkName(int32_t type) {
  switch (type) {
    case DSINK_Dispatch:
      return "Dispatch";
    case DSINK_Insert:
      return "Insert";
    default:
      break;
  }
  return "Unknown";
}

int32_t dsinkNameToDsinkType(const char* name) {
  if (0 == strcmp(name, "Dispatch")) {
    return DSINK_Dispatch;
  } else if (0 == strcmp(name, "Insert")) {
    return DSINK_Insert;
  }
  return DSINK_Unknown;
}

80 81 82 83 84 85 86 87 88 89
static bool copySchema(SDataBlockSchema* dst, const SDataBlockSchema* src) {
  dst->pSchema = malloc(sizeof(SSlotSchema) * src->numOfCols);
  if (NULL == dst->pSchema) {
    return false;
  }
  memcpy(dst->pSchema, src->pSchema, sizeof(SSlotSchema) * src->numOfCols);
  dst->numOfCols = src->numOfCols;
  dst->resultRowSize = src->resultRowSize;
  dst->precision = src->precision;
  return true;
X
Xiaoyu Wang 已提交
90 91
}

92
static bool toDataBlockSchema(SQueryPlanNode* pPlanNode, SDataBlockSchema* dataBlockSchema) {
93
  dataBlockSchema->numOfCols = pPlanNode->numOfExpr;
94
  dataBlockSchema->pSchema = malloc(sizeof(SSlotSchema) * pPlanNode->numOfExpr);
95 96 97
  if (NULL == dataBlockSchema->pSchema) {
    return false;
  }
98

99
  dataBlockSchema->resultRowSize = 0;
100 101 102 103
  for (int32_t i = 0; i < pPlanNode->numOfExpr; ++i) {
    SExprInfo* pExprInfo = taosArrayGetP(pPlanNode->pExpr, i);
    memcpy(&dataBlockSchema->pSchema[i], &pExprInfo->base.resSchema, sizeof(SSlotSchema));

104 105
    dataBlockSchema->resultRowSize += dataBlockSchema->pSchema[i].bytes;
  }
106

107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
  return true;
}

static bool cloneExprArray(SArray** dst, SArray* src) {
  if (NULL == src) {
    return true;
  }
  size_t size = taosArrayGetSize(src);
  if (0 == size) {
    return true;
  }
  *dst = taosArrayInit(size, POINTER_BYTES);
  if (NULL == *dst) {
    return false;
  }
  return (TSDB_CODE_SUCCESS == copyAllExprInfo(*dst, src, true) ? true : false);
X
Xiaoyu Wang 已提交
123 124
}

125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
static SDataSink* initDataSink(int32_t type, int32_t size, const SPhyNode* pRoot) {
  SDataSink* sink = (SDataSink*)validPointer(calloc(1, size));
  sink->info.type = type;
  sink->info.name = dsinkTypeToDsinkName(type);
  if (NULL !=pRoot && !copySchema(&sink->schema, &pRoot->targetSchema)) {
    tfree(sink);
    THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
  }
  return sink;
}

static SDataSink* createDataInserter(SPlanContext* pCxt, SVgDataBlocks* pBlocks, const SPhyNode* pRoot) {
  SDataInserter* inserter = (SDataInserter*)initDataSink(DSINK_Insert, sizeof(SDataInserter), pRoot);
  inserter->numOfTables = pBlocks->numOfTables;
  inserter->size = pBlocks->size;
dengyihao's avatar
dengyihao 已提交
140
  TSWAP(inserter->pData, pBlocks->pData, char*);
141 142 143 144 145 146 147 148
  return (SDataSink*)inserter;
}

static SDataSink* createDataDispatcher(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, const SPhyNode* pRoot) {
  SDataDispatcher* dispatcher = (SDataDispatcher*)initDataSink(DSINK_Dispatch, sizeof(SDataDispatcher), pRoot);
  return (SDataSink*)dispatcher;
}

149
static SPhyNode* initPhyNode(SQueryPlanNode* pPlanNode, int32_t type, int32_t size) {
150
  SPhyNode* node = (SPhyNode*)validPointer(calloc(1, size));
X
Xiaoyu Wang 已提交
151
  node->info.type = type;
152 153 154
  node->info.name = opTypeToOpName(type);
  if (!cloneExprArray(&node->pTargets, pPlanNode->pExpr) || !toDataBlockSchema(pPlanNode, &(node->targetSchema))) {
    free(node);
155
    THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
156 157
  }
  return node;
158 159
}

160 161 162 163 164 165 166 167 168 169
static void cleanupPhyNode(SPhyNode* pPhyNode) {
  if (pPhyNode == NULL) {
    return;
  }

  dropOneLevelExprInfo(pPhyNode->pTargets);
  tfree(pPhyNode->targetSchema.pSchema);
  tfree(pPhyNode);
}

170
static SPhyNode* initScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t type, int32_t size) {
H
Haojun Liao 已提交
171 172 173 174 175 176 177 178
  SScanPhyNode* node = (SScanPhyNode*) initPhyNode(pPlanNode, type, size);

  STableMeta *pTableMeta = pTable->pMeta->pTableMeta;
  node->uid       = pTableMeta->uid;
  node->count     = 1;
  node->order     = TSDB_ORDER_ASC;
  node->tableType = pTableMeta->tableType;

179 180 181 182 183 184 185
  return (SPhyNode*)node;
}

static SPhyNode* createPseudoScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t op) {
  return initScanNode(pPlanNode, pTable, op, sizeof(SScanPhyNode));
}

186
static SPhyNode* createTagScanNode(SQueryPlanNode* pPlanNode) {
187 188 189 190 191 192
  SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo;
  return createPseudoScanNode(pPlanNode, pTable, OP_TagScan);
}

static uint8_t getScanFlag(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
  // todo
H
Haojun Liao 已提交
193
  return MAIN_SCAN;
194 195
}

H
Haojun Liao 已提交
196 197 198 199
static SPhyNode* createUserTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pQueryTableInfo, int32_t op) {
  STableScanPhyNode* node = (STableScanPhyNode*)initScanNode(pPlanNode, pQueryTableInfo, op, sizeof(STableScanPhyNode));
  node->scanFlag = getScanFlag(pPlanNode, pQueryTableInfo);
  node->window = pQueryTableInfo->window;
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
  // todo tag cond
  return (SPhyNode*)node;
}


static bool isSystemTable(SQueryTableInfo* pTable) {
  // todo
  return false;
}

static bool needSeqScan(SQueryPlanNode* pPlanNode) {
  // todo
  return false;
}

static SPhyNode* createMultiTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
  if (isSystemTable(pTable)) {
    return createPseudoScanNode(pPlanNode, pTable, OP_SystemTableScan);
  } else if (needSeqScan(pPlanNode)) {
    return createUserTableScanNode(pPlanNode, pTable, OP_TableSeqScan);
  }
H
Haojun Liao 已提交
221
  int32_t type = (pPlanNode->info.type == QNODE_TABLESCAN)? OP_TableScan:OP_StreamScan;
H
Haojun Liao 已提交
222
  return createUserTableScanNode(pPlanNode, pTable, type);
223 224 225
}

static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
226
  SSubplan* subplan = validPointer(calloc(1, sizeof(SSubplan)));
227 228
  subplan->id = pCxt->nextId;
  ++(pCxt->nextId.subplanId);
H
Haojun Liao 已提交
229 230

  subplan->type  = type;
231 232 233
  subplan->level = 0;
  if (NULL != pCxt->pCurrentSubplan) {
    subplan->level = pCxt->pCurrentSubplan->level + 1;
H
Haojun Liao 已提交
234 235
    if (NULL == pCxt->pCurrentSubplan->pChildren) {
      pCxt->pCurrentSubplan->pChildren = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
236
    }
H
Haojun Liao 已提交
237 238

    taosArrayPush(pCxt->pCurrentSubplan->pChildren, &subplan);
239
    subplan->pParents = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
240 241
    taosArrayPush(subplan->pParents, &pCxt->pCurrentSubplan);
  }
H
Haojun Liao 已提交
242

243 244
  SArray* currentLevel;
  if (subplan->level >= taosArrayGetSize(pCxt->pDag->pSubplans)) {
245
    currentLevel = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
246 247 248
    taosArrayPush(pCxt->pDag->pSubplans, &currentLevel);
  } else {
    currentLevel = taosArrayGetP(pCxt->pDag->pSubplans, subplan->level);
249
  }
H
Haojun Liao 已提交
250

251
  taosArrayPush(currentLevel, &subplan);
252
  pCxt->pCurrentSubplan = subplan;
X
Xiaoyu Wang 已提交
253
  ++(pCxt->pDag->numOfSubplans);
254 255 256
  return subplan;
}

H
Haojun Liao 已提交
257 258 259
static void vgroupInfoToNodeAddr(const SVgroupInfo* vg, SQueryNodeAddr* pNodeAddr) {
  pNodeAddr->nodeId = vg->vgId;
  pNodeAddr->epset  = vg->epset;
X
Xiaoyu Wang 已提交
260 261
}

262 263
static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, SQueryTableInfo* pTableInfo) {
  SVgroupsInfo* pVgroupList = pTableInfo->pMeta->vgroupList;
264
  for (int32_t i = 0; i < pVgroupList->numOfVgroups; ++i) {
265
    STORE_CURRENT_SUBPLAN(pCxt);
266
    SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN);
D
dapan 已提交
267
    subplan->msgType   = TDMT_VND_QUERY;
H
Haojun Liao 已提交
268 269

    vgroupInfoToNodeAddr(&(pTableInfo->pMeta->vgroupList->vgroups[i]), &subplan->execNode);
270
    subplan->pNode = createMultiTableScanNode(pPlanNode, pTableInfo);
271
    subplan->pDataSink = createDataDispatcher(pCxt, pPlanNode, subplan->pNode);
272
    RECOVERY_CURRENT_SUBPLAN(pCxt);
273
  }
274
  return pCxt->nextId.templateId++;
X
Xiaoyu Wang 已提交
275 276
}

277 278 279
static SPhyNode* createExchangeNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, uint64_t srcTemplateId) {
  SExchangePhyNode* node = (SExchangePhyNode*)initPhyNode(pPlanNode, OP_Exchange, sizeof(SExchangePhyNode));
  node->srcTemplateId = srcTemplateId;
H
Haojun Liao 已提交
280
  node->pSrcEndPoints = validPointer(taosArrayInit(TARRAY_MIN_SIZE, sizeof(SDownstreamSource)));
281 282
  return (SPhyNode*)node;
}
283

284 285 286
static bool needMultiNodeScan(SQueryTableInfo* pTable) {
  // todo system table, for instance, user_tables
  return (TSDB_SUPER_TABLE == pTable->pMeta->pTableMeta->tableType);
287 288
}

H
Haojun Liao 已提交
289 290 291 292
// TODO: the SVgroupInfo index
static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTableInfo, SSubplan* subplan) {
  SVgroupsInfo* pVgroupsInfo = pTableInfo->pMeta->vgroupList;
  vgroupInfoToNodeAddr(&(pVgroupsInfo->vgroups[0]), &subplan->execNode);
H
Haojun Liao 已提交
293
  int32_t type = (pPlanNode->info.type == QNODE_TABLESCAN)? OP_TableScan:OP_StreamScan;
H
Haojun Liao 已提交
294
  return createUserTableScanNode(pPlanNode, pTableInfo, type);
D
dapan 已提交
295 296
}

297
static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
298
  SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo;
299 300
  if (needMultiNodeScan(pTable)) {
    return createExchangeNode(pCxt, pPlanNode, splitSubplanByTable(pCxt, pPlanNode, pTable));
301
  }
302

D
dapan 已提交
303
  return createSingleTableScanNode(pPlanNode, pTable, pCxt->pCurrentSubplan);
304 305
}

306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324
static SPhyNode* createSingleTableAgg(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
  SAggPhyNode* node = (SAggPhyNode*)initPhyNode(pPlanNode, OP_Aggregate, sizeof(SAggPhyNode));
  SGroupbyExpr* pGroupBy = (SGroupbyExpr*)pPlanNode->pExtInfo;
  node->aggAlgo = AGG_ALGO_PLAIN;
  node->aggSplit = AGG_SPLIT_FINAL;
  if (NULL != pGroupBy) {
    node->aggAlgo = AGG_ALGO_HASHED;
    node->pGroupByList = validPointer(taosArrayDup(pGroupBy->columnInfo));
  }
  return (SPhyNode*)node;
}

static SPhyNode* createAggNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
  // if (needMultiNodeAgg(pPlanNode)) {

  // }
  return createSingleTableAgg(pCxt, pPlanNode);
}

325
static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
X
Xiaoyu Wang 已提交
326 327 328 329 330
  SPhyNode* node = NULL;
  switch (pPlanNode->info.type) {
    case QNODE_TAGSCAN:
      node = createTagScanNode(pPlanNode);
      break;
H
Haojun Liao 已提交
331
    case QNODE_STREAMSCAN:
X
Xiaoyu Wang 已提交
332
    case QNODE_TABLESCAN:
333
      node = createTableScanNode(pCxt, pPlanNode);
X
Xiaoyu Wang 已提交
334
      break;
335 336 337 338
    case QNODE_AGGREGATE:
    case QNODE_GROUPBY:
      node = createAggNode(pCxt, pPlanNode);
      break;
339
    case QNODE_MODIFY:
X
Xiaoyu Wang 已提交
340 341
      // Insert is not an operator in a physical plan.
      break;
X
Xiaoyu Wang 已提交
342 343 344
    default:
      assert(false);
  }
H
Haojun Liao 已提交
345

X
Xiaoyu Wang 已提交
346
  if (pPlanNode->pChildren != NULL && taosArrayGetSize(pPlanNode->pChildren) > 0) {
X
Xiaoyu Wang 已提交
347
    node->pChildren = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
X
Xiaoyu Wang 已提交
348 349
    size_t size = taosArrayGetSize(pPlanNode->pChildren);
    for(int32_t i = 0; i < size; ++i) {
350
      SPhyNode* child = createPhyNode(pCxt, taosArrayGetP(pPlanNode->pChildren, i));
X
Xiaoyu Wang 已提交
351 352 353
      child->pParent = node;
      taosArrayPush(node->pChildren, &child);
    }
354
  }
H
Haojun Liao 已提交
355

X
Xiaoyu Wang 已提交
356
  return node;
357 358
}

H
Haojun Liao 已提交
359 360 361 362 363
static void splitModificationOpSubPlan(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
  SDataPayloadInfo* pPayload = (SDataPayloadInfo*) pPlanNode->pExtInfo;

  size_t numOfVgroups = taosArrayGetSize(pPayload->payload);
  for (int32_t i = 0; i < numOfVgroups; ++i) {
X
Xiaoyu Wang 已提交
364
    STORE_CURRENT_SUBPLAN(pCxt);
X
Xiaoyu Wang 已提交
365
    SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MODIFY);
H
Haojun Liao 已提交
366 367
    SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(pPayload->payload, i);

H
Haojun Liao 已提交
368
    subplan->execNode.epset = blocks->vg.epset;
369
    subplan->pDataSink  = createDataInserter(pCxt, blocks, NULL);
370 371 372
    subplan->pNode      = NULL;
    subplan->type       = QUERY_TYPE_MODIFY;
    subplan->msgType    = pPayload->msgType;
373
    subplan->id.queryId = pCxt->pDag->queryId;
H
Haojun Liao 已提交
374

X
Xiaoyu Wang 已提交
375
    RECOVERY_CURRENT_SUBPLAN(pCxt);
X
Xiaoyu Wang 已提交
376 377 378
  }
}

379
static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) {
380
  if (QNODE_MODIFY == pRoot->info.type) {
H
Haojun Liao 已提交
381
    splitModificationOpSubPlan(pCxt, pRoot);
X
Xiaoyu Wang 已提交
382
  } else {
D
dapan 已提交
383
    SSubplan* subplan  = initSubplan(pCxt, QUERY_TYPE_SCAN);
X
Xiaoyu Wang 已提交
384
    ++(pCxt->nextId.templateId);
H
Haojun Liao 已提交
385 386 387

    subplan->msgType   = TDMT_VND_QUERY;
    subplan->pNode     = createPhyNode(pCxt, pRoot);
388
    subplan->pDataSink = createDataDispatcher(pCxt, pRoot, subplan->pNode);
X
Xiaoyu Wang 已提交
389
  }
390 391 392
  // todo deal subquery
}

393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419
static void postCreateDag(SQueryPlanNode* pQueryNode, SQueryDag* pDag, SArray* pNodeList) {
  // The exchange operator is not necessary, in case of the stream scan.
  // Here we need to remove it from the DAG.
  if (pQueryNode->info.type == QNODE_STREAMSCAN) {
    SArray* pRootLevel = taosArrayGetP(pDag->pSubplans, 0);
    SSubplan *pSubplan = taosArrayGetP(pRootLevel, 0);

    if (pSubplan->pNode->info.type == OP_Exchange) {
      ASSERT(taosArrayGetSize(pRootLevel) == 1);

      taosArrayRemove(pDag->pSubplans, 0);
      // And then update the number of the subplans.
      pDag->numOfSubplans -= 1;
    }
  } else {
    // Traverse the dag again to acquire the execution node.
    if (pNodeList != NULL) {
      SArray** pSubLevel = taosArrayGetLast(pDag->pSubplans);
      size_t   num = taosArrayGetSize(*pSubLevel);
      for (int32_t j = 0; j < num; ++j) {
        SSubplan* pPlan = taosArrayGetP(*pSubLevel, j);
        taosArrayPush(pNodeList, &pPlan->execNode);
      }
    }
  }
}

420
int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag, SArray* pNodeList, uint64_t requestId) {
X
Xiaoyu Wang 已提交
421 422 423
  TRY(TSDB_MAX_TAG_CONDITIONS) {
    SPlanContext context = {
      .pCatalog = pCatalog,
H
Haojun Liao 已提交
424
      .pDag     = validPointer(calloc(1, sizeof(SQueryDag))),
X
Xiaoyu Wang 已提交
425
      .pCurrentSubplan = NULL,
426 427
       //The unsigned Id starting from 1 would be better
      .nextId   = {.queryId = requestId, .subplanId = 1, .templateId = 1},
X
Xiaoyu Wang 已提交
428
    };
H
Haojun Liao 已提交
429

X
Xiaoyu Wang 已提交
430
    *pDag = context.pDag;
431 432
    context.pDag->queryId = requestId;

433
    context.pDag->pSubplans = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
X
Xiaoyu Wang 已提交
434 435 436 437 438 439
    createSubplanByLevel(&context, pQueryNode);
  } CATCH(code) {
    CLEANUP_EXECUTE();
    terrno = code;
    return TSDB_CODE_FAILED;
  } END_TRY
440

441
  postCreateDag(pQueryNode, *pDag, pNodeList);
442
  return TSDB_CODE_SUCCESS;
443
}
X
Xiaoyu Wang 已提交
444

H
Haojun Liao 已提交
445
void setExchangSourceNode(uint64_t templateId, SDownstreamSource *pSource, SPhyNode* pNode) {
446 447 448 449 450 451
  if (NULL == pNode) {
    return;
  }
  if (OP_Exchange == pNode->info.type) {
    SExchangePhyNode* pExchange = (SExchangePhyNode*)pNode;
    if (templateId == pExchange->srcTemplateId) {
H
Haojun Liao 已提交
452
      taosArrayPush(pExchange->pSrcEndPoints, pSource);
453 454
    }
  }
H
Haojun Liao 已提交
455

456 457 458
  if (pNode->pChildren != NULL) {
    size_t size = taosArrayGetSize(pNode->pChildren);
    for(int32_t i = 0; i < size; ++i) {
H
Haojun Liao 已提交
459
      setExchangSourceNode(templateId, pSource, taosArrayGetP(pNode->pChildren, i));
460 461 462 463
    }
  }
}

H
Haojun Liao 已提交
464 465
void setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource) {
  setExchangSourceNode(templateId, pSource, subplan->pNode);
X
Xiaoyu Wang 已提交
466
}
467 468 469 470 471 472

static void destroyDataSinkNode(SDataSink* pSinkNode) {
  if (pSinkNode == NULL) {
    return;
  }

X
Xiaoyu Wang 已提交
473
  if (queryNodeType(pSinkNode) == DSINK_Dispatch) {
474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492
    SDataDispatcher* pDdSink = (SDataDispatcher*)pSinkNode;
    tfree(pDdSink->sink.schema.pSchema);
  }

  tfree(pSinkNode);
}

void qDestroySubplan(SSubplan* pSubplan) {
  if (pSubplan == NULL) {
    return;
  }

  taosArrayDestroy(pSubplan->pChildren);
  taosArrayDestroy(pSubplan->pParents);
  destroyDataSinkNode(pSubplan->pDataSink);
  cleanupPhyNode(pSubplan->pNode);

  tfree(pSubplan);
}
X
Xiaoyu Wang 已提交
493 494

#endif