physicalPlan.c 15.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "plannerInt.h"
S
Shengliang Guan 已提交
17
#include "texception.h"
18
#include "parser.h"
19

20 21 22
#define STORE_CURRENT_SUBPLAN(cxt) SSubplan* _ = cxt->pCurrentSubplan
#define RECOVERY_CURRENT_SUBPLAN(cxt) cxt->pCurrentSubplan = _

23 24 25 26 27 28 29
typedef struct SPlanContext {
  struct SCatalog* pCatalog;
  struct SQueryDag* pDag;
  SSubplan* pCurrentSubplan;
  SSubplanId nextId;
} SPlanContext;

30 31 32 33 34 35
static const char* gOpName[] = {
  "Unknown",
#define INCLUDE_AS_NAME
#include "plannerOp.h"
#undef INCLUDE_AS_NAME
};
X
Xiaoyu Wang 已提交
36

37
static void* validPointer(void* p) {
X
Xiaoyu Wang 已提交
38 39 40 41 42 43
  if (NULL == p) {
    THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
  }
  return p;
}

44 45 46 47
const char* opTypeToOpName(int32_t type) {
  return gOpName[type];
}

48 49
int32_t opNameToOpType(const char* name) {
  for (int32_t i = 1; i < sizeof(gOpName) / sizeof(gOpName[0]); ++i) {
50
    if (0 == strcmp(name, gOpName[i])) {
51 52 53 54 55
      return i;
    }
  }
  return OP_Unknown;
}
X
Xiaoyu Wang 已提交
56

57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
const char* dsinkTypeToDsinkName(int32_t type) {
  switch (type) {
    case DSINK_Dispatch:
      return "Dispatch";
    case DSINK_Insert:
      return "Insert";
    default:
      break;
  }
  return "Unknown";
}

int32_t dsinkNameToDsinkType(const char* name) {
  if (0 == strcmp(name, "Dispatch")) {
    return DSINK_Dispatch;
  } else if (0 == strcmp(name, "Insert")) {
    return DSINK_Insert;
  }
  return DSINK_Unknown;
}

78 79 80 81 82 83 84 85 86 87
static bool copySchema(SDataBlockSchema* dst, const SDataBlockSchema* src) {
  dst->pSchema = malloc(sizeof(SSlotSchema) * src->numOfCols);
  if (NULL == dst->pSchema) {
    return false;
  }
  memcpy(dst->pSchema, src->pSchema, sizeof(SSlotSchema) * src->numOfCols);
  dst->numOfCols = src->numOfCols;
  dst->resultRowSize = src->resultRowSize;
  dst->precision = src->precision;
  return true;
X
Xiaoyu Wang 已提交
88 89
}

90
static bool toDataBlockSchema(SQueryPlanNode* pPlanNode, SDataBlockSchema* dataBlockSchema) {
91
  dataBlockSchema->numOfCols = pPlanNode->numOfExpr;
92
  dataBlockSchema->pSchema = malloc(sizeof(SSlotSchema) * pPlanNode->numOfExpr);
93 94 95
  if (NULL == dataBlockSchema->pSchema) {
    return false;
  }
96

97
  dataBlockSchema->resultRowSize = 0;
98 99 100 101
  for (int32_t i = 0; i < pPlanNode->numOfExpr; ++i) {
    SExprInfo* pExprInfo = taosArrayGetP(pPlanNode->pExpr, i);
    memcpy(&dataBlockSchema->pSchema[i], &pExprInfo->base.resSchema, sizeof(SSlotSchema));

102 103
    dataBlockSchema->resultRowSize += dataBlockSchema->pSchema[i].bytes;
  }
104

105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
  return true;
}

static bool cloneExprArray(SArray** dst, SArray* src) {
  if (NULL == src) {
    return true;
  }
  size_t size = taosArrayGetSize(src);
  if (0 == size) {
    return true;
  }
  *dst = taosArrayInit(size, POINTER_BYTES);
  if (NULL == *dst) {
    return false;
  }
  return (TSDB_CODE_SUCCESS == copyAllExprInfo(*dst, src, true) ? true : false);
X
Xiaoyu Wang 已提交
121 122
}

123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
static SDataSink* initDataSink(int32_t type, int32_t size, const SPhyNode* pRoot) {
  SDataSink* sink = (SDataSink*)validPointer(calloc(1, size));
  sink->info.type = type;
  sink->info.name = dsinkTypeToDsinkName(type);
  if (NULL !=pRoot && !copySchema(&sink->schema, &pRoot->targetSchema)) {
    tfree(sink);
    THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
  }
  return sink;
}

static SDataSink* createDataInserter(SPlanContext* pCxt, SVgDataBlocks* pBlocks, const SPhyNode* pRoot) {
  SDataInserter* inserter = (SDataInserter*)initDataSink(DSINK_Insert, sizeof(SDataInserter), pRoot);
  inserter->numOfTables = pBlocks->numOfTables;
  inserter->size = pBlocks->size;
dengyihao's avatar
dengyihao 已提交
138
  TSWAP(inserter->pData, pBlocks->pData, char*);
139 140 141 142 143 144 145 146
  return (SDataSink*)inserter;
}

static SDataSink* createDataDispatcher(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, const SPhyNode* pRoot) {
  SDataDispatcher* dispatcher = (SDataDispatcher*)initDataSink(DSINK_Dispatch, sizeof(SDataDispatcher), pRoot);
  return (SDataSink*)dispatcher;
}

147
static SPhyNode* initPhyNode(SQueryPlanNode* pPlanNode, int32_t type, int32_t size) {
148
  SPhyNode* node = (SPhyNode*)validPointer(calloc(1, size));
X
Xiaoyu Wang 已提交
149
  node->info.type = type;
150 151 152
  node->info.name = opTypeToOpName(type);
  if (!cloneExprArray(&node->pTargets, pPlanNode->pExpr) || !toDataBlockSchema(pPlanNode, &(node->targetSchema))) {
    free(node);
153
    THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
154 155
  }
  return node;
156 157
}

158 159 160 161 162 163 164 165 166 167
static void cleanupPhyNode(SPhyNode* pPhyNode) {
  if (pPhyNode == NULL) {
    return;
  }

  dropOneLevelExprInfo(pPhyNode->pTargets);
  tfree(pPhyNode->targetSchema.pSchema);
  tfree(pPhyNode);
}

168
static SPhyNode* initScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t type, int32_t size) {
H
Haojun Liao 已提交
169 170 171 172 173 174 175 176
  SScanPhyNode* node = (SScanPhyNode*) initPhyNode(pPlanNode, type, size);

  STableMeta *pTableMeta = pTable->pMeta->pTableMeta;
  node->uid       = pTableMeta->uid;
  node->count     = 1;
  node->order     = TSDB_ORDER_ASC;
  node->tableType = pTableMeta->tableType;

177 178 179 180 181 182 183
  return (SPhyNode*)node;
}

static SPhyNode* createPseudoScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t op) {
  return initScanNode(pPlanNode, pTable, op, sizeof(SScanPhyNode));
}

184
static SPhyNode* createTagScanNode(SQueryPlanNode* pPlanNode) {
185 186 187 188 189 190
  SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo;
  return createPseudoScanNode(pPlanNode, pTable, OP_TagScan);
}

static uint8_t getScanFlag(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
  // todo
H
Haojun Liao 已提交
191
  return MAIN_SCAN;
192 193
}

H
Haojun Liao 已提交
194 195 196 197
static SPhyNode* createUserTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pQueryTableInfo, int32_t op) {
  STableScanPhyNode* node = (STableScanPhyNode*)initScanNode(pPlanNode, pQueryTableInfo, op, sizeof(STableScanPhyNode));
  node->scanFlag = getScanFlag(pPlanNode, pQueryTableInfo);
  node->window = pQueryTableInfo->window;
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
  // todo tag cond
  return (SPhyNode*)node;
}


static bool isSystemTable(SQueryTableInfo* pTable) {
  // todo
  return false;
}

static bool needSeqScan(SQueryPlanNode* pPlanNode) {
  // todo
  return false;
}

static SPhyNode* createMultiTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
  if (isSystemTable(pTable)) {
    return createPseudoScanNode(pPlanNode, pTable, OP_SystemTableScan);
  } else if (needSeqScan(pPlanNode)) {
    return createUserTableScanNode(pPlanNode, pTable, OP_TableSeqScan);
  }
H
Haojun Liao 已提交
219
  int32_t type = (pPlanNode->info.type == QNODE_TABLESCAN)? OP_TableScan:OP_StreamScan;
H
Haojun Liao 已提交
220
  return createUserTableScanNode(pPlanNode, pTable, type);
221 222 223
}

static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
224
  SSubplan* subplan = validPointer(calloc(1, sizeof(SSubplan)));
225 226
  subplan->id = pCxt->nextId;
  ++(pCxt->nextId.subplanId);
H
Haojun Liao 已提交
227 228

  subplan->type  = type;
229 230 231
  subplan->level = 0;
  if (NULL != pCxt->pCurrentSubplan) {
    subplan->level = pCxt->pCurrentSubplan->level + 1;
H
Haojun Liao 已提交
232 233
    if (NULL == pCxt->pCurrentSubplan->pChildren) {
      pCxt->pCurrentSubplan->pChildren = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
234
    }
H
Haojun Liao 已提交
235 236

    taosArrayPush(pCxt->pCurrentSubplan->pChildren, &subplan);
237
    subplan->pParents = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
238 239
    taosArrayPush(subplan->pParents, &pCxt->pCurrentSubplan);
  }
H
Haojun Liao 已提交
240

241 242
  SArray* currentLevel;
  if (subplan->level >= taosArrayGetSize(pCxt->pDag->pSubplans)) {
243
    currentLevel = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
244 245 246
    taosArrayPush(pCxt->pDag->pSubplans, &currentLevel);
  } else {
    currentLevel = taosArrayGetP(pCxt->pDag->pSubplans, subplan->level);
247
  }
H
Haojun Liao 已提交
248

249
  taosArrayPush(currentLevel, &subplan);
250
  pCxt->pCurrentSubplan = subplan;
X
Xiaoyu Wang 已提交
251
  ++(pCxt->pDag->numOfSubplans);
252 253 254
  return subplan;
}

H
Haojun Liao 已提交
255 256 257
static void vgroupInfoToNodeAddr(const SVgroupInfo* vg, SQueryNodeAddr* pNodeAddr) {
  pNodeAddr->nodeId = vg->vgId;
  pNodeAddr->epset  = vg->epset;
X
Xiaoyu Wang 已提交
258 259
}

260 261
static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, SQueryTableInfo* pTableInfo) {
  SVgroupsInfo* pVgroupList = pTableInfo->pMeta->vgroupList;
262
  for (int32_t i = 0; i < pVgroupList->numOfVgroups; ++i) {
263
    STORE_CURRENT_SUBPLAN(pCxt);
264
    SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN);
D
dapan 已提交
265
    subplan->msgType   = TDMT_VND_QUERY;
H
Haojun Liao 已提交
266 267

    vgroupInfoToNodeAddr(&(pTableInfo->pMeta->vgroupList->vgroups[i]), &subplan->execNode);
268
    subplan->pNode = createMultiTableScanNode(pPlanNode, pTableInfo);
269
    subplan->pDataSink = createDataDispatcher(pCxt, pPlanNode, subplan->pNode);
270
    RECOVERY_CURRENT_SUBPLAN(pCxt);
271
  }
272
  return pCxt->nextId.templateId++;
X
Xiaoyu Wang 已提交
273 274
}

275 276 277
static SPhyNode* createExchangeNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, uint64_t srcTemplateId) {
  SExchangePhyNode* node = (SExchangePhyNode*)initPhyNode(pPlanNode, OP_Exchange, sizeof(SExchangePhyNode));
  node->srcTemplateId = srcTemplateId;
H
Haojun Liao 已提交
278
  node->pSrcEndPoints = validPointer(taosArrayInit(TARRAY_MIN_SIZE, sizeof(SDownstreamSource)));
279 280
  return (SPhyNode*)node;
}
281

282 283 284
static bool needMultiNodeScan(SQueryTableInfo* pTable) {
  // todo system table, for instance, user_tables
  return (TSDB_SUPER_TABLE == pTable->pMeta->pTableMeta->tableType);
285 286
}

H
Haojun Liao 已提交
287 288 289 290
// TODO: the SVgroupInfo index
static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTableInfo, SSubplan* subplan) {
  SVgroupsInfo* pVgroupsInfo = pTableInfo->pMeta->vgroupList;
  vgroupInfoToNodeAddr(&(pVgroupsInfo->vgroups[0]), &subplan->execNode);
H
Haojun Liao 已提交
291
  int32_t type = (pPlanNode->info.type == QNODE_TABLESCAN)? OP_TableScan:OP_StreamScan;
H
Haojun Liao 已提交
292
  return createUserTableScanNode(pPlanNode, pTableInfo, type);
D
dapan 已提交
293 294
}

295
static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
296
  SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo;
297 298
  if (needMultiNodeScan(pTable)) {
    return createExchangeNode(pCxt, pPlanNode, splitSubplanByTable(pCxt, pPlanNode, pTable));
299
  }
300

D
dapan 已提交
301
  return createSingleTableScanNode(pPlanNode, pTable, pCxt->pCurrentSubplan);
302 303
}

304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322
static SPhyNode* createSingleTableAgg(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
  SAggPhyNode* node = (SAggPhyNode*)initPhyNode(pPlanNode, OP_Aggregate, sizeof(SAggPhyNode));
  SGroupbyExpr* pGroupBy = (SGroupbyExpr*)pPlanNode->pExtInfo;
  node->aggAlgo = AGG_ALGO_PLAIN;
  node->aggSplit = AGG_SPLIT_FINAL;
  if (NULL != pGroupBy) {
    node->aggAlgo = AGG_ALGO_HASHED;
    node->pGroupByList = validPointer(taosArrayDup(pGroupBy->columnInfo));
  }
  return (SPhyNode*)node;
}

static SPhyNode* createAggNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
  // if (needMultiNodeAgg(pPlanNode)) {

  // }
  return createSingleTableAgg(pCxt, pPlanNode);
}

323
static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
X
Xiaoyu Wang 已提交
324 325 326 327 328
  SPhyNode* node = NULL;
  switch (pPlanNode->info.type) {
    case QNODE_TAGSCAN:
      node = createTagScanNode(pPlanNode);
      break;
H
Haojun Liao 已提交
329
    case QNODE_STREAMSCAN:
X
Xiaoyu Wang 已提交
330
    case QNODE_TABLESCAN:
331
      node = createTableScanNode(pCxt, pPlanNode);
X
Xiaoyu Wang 已提交
332
      break;
333 334 335 336
    case QNODE_AGGREGATE:
    case QNODE_GROUPBY:
      node = createAggNode(pCxt, pPlanNode);
      break;
337
    case QNODE_MODIFY:
X
Xiaoyu Wang 已提交
338 339
      // Insert is not an operator in a physical plan.
      break;
X
Xiaoyu Wang 已提交
340 341 342
    default:
      assert(false);
  }
H
Haojun Liao 已提交
343

X
Xiaoyu Wang 已提交
344
  if (pPlanNode->pChildren != NULL && taosArrayGetSize(pPlanNode->pChildren) > 0) {
X
Xiaoyu Wang 已提交
345
    node->pChildren = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
X
Xiaoyu Wang 已提交
346 347
    size_t size = taosArrayGetSize(pPlanNode->pChildren);
    for(int32_t i = 0; i < size; ++i) {
348
      SPhyNode* child = createPhyNode(pCxt, taosArrayGetP(pPlanNode->pChildren, i));
X
Xiaoyu Wang 已提交
349 350 351
      child->pParent = node;
      taosArrayPush(node->pChildren, &child);
    }
352
  }
H
Haojun Liao 已提交
353

X
Xiaoyu Wang 已提交
354
  return node;
355 356
}

H
Haojun Liao 已提交
357 358 359 360 361
static void splitModificationOpSubPlan(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
  SDataPayloadInfo* pPayload = (SDataPayloadInfo*) pPlanNode->pExtInfo;

  size_t numOfVgroups = taosArrayGetSize(pPayload->payload);
  for (int32_t i = 0; i < numOfVgroups; ++i) {
X
Xiaoyu Wang 已提交
362
    STORE_CURRENT_SUBPLAN(pCxt);
X
Xiaoyu Wang 已提交
363
    SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MODIFY);
H
Haojun Liao 已提交
364 365
    SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(pPayload->payload, i);

H
Haojun Liao 已提交
366
    subplan->execNode.epset = blocks->vg.epset;
367
    subplan->pDataSink  = createDataInserter(pCxt, blocks, NULL);
368 369 370
    subplan->pNode      = NULL;
    subplan->type       = QUERY_TYPE_MODIFY;
    subplan->msgType    = pPayload->msgType;
371
    subplan->id.queryId = pCxt->pDag->queryId;
H
Haojun Liao 已提交
372

X
Xiaoyu Wang 已提交
373
    RECOVERY_CURRENT_SUBPLAN(pCxt);
X
Xiaoyu Wang 已提交
374 375 376
  }
}

377
static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) {
378
  if (QNODE_MODIFY == pRoot->info.type) {
H
Haojun Liao 已提交
379
    splitModificationOpSubPlan(pCxt, pRoot);
X
Xiaoyu Wang 已提交
380
  } else {
D
dapan 已提交
381
    SSubplan* subplan  = initSubplan(pCxt, QUERY_TYPE_SCAN);
X
Xiaoyu Wang 已提交
382
    ++(pCxt->nextId.templateId);
H
Haojun Liao 已提交
383 384 385

    subplan->msgType   = TDMT_VND_QUERY;
    subplan->pNode     = createPhyNode(pCxt, pRoot);
386
    subplan->pDataSink = createDataDispatcher(pCxt, pRoot, subplan->pNode);
X
Xiaoyu Wang 已提交
387
  }
388 389 390
  // todo deal subquery
}

391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417
static void postCreateDag(SQueryPlanNode* pQueryNode, SQueryDag* pDag, SArray* pNodeList) {
  // The exchange operator is not necessary, in case of the stream scan.
  // Here we need to remove it from the DAG.
  if (pQueryNode->info.type == QNODE_STREAMSCAN) {
    SArray* pRootLevel = taosArrayGetP(pDag->pSubplans, 0);
    SSubplan *pSubplan = taosArrayGetP(pRootLevel, 0);

    if (pSubplan->pNode->info.type == OP_Exchange) {
      ASSERT(taosArrayGetSize(pRootLevel) == 1);

      taosArrayRemove(pDag->pSubplans, 0);
      // And then update the number of the subplans.
      pDag->numOfSubplans -= 1;
    }
  } else {
    // Traverse the dag again to acquire the execution node.
    if (pNodeList != NULL) {
      SArray** pSubLevel = taosArrayGetLast(pDag->pSubplans);
      size_t   num = taosArrayGetSize(*pSubLevel);
      for (int32_t j = 0; j < num; ++j) {
        SSubplan* pPlan = taosArrayGetP(*pSubLevel, j);
        taosArrayPush(pNodeList, &pPlan->execNode);
      }
    }
  }
}

418
int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag, SArray* pNodeList, uint64_t requestId) {
X
Xiaoyu Wang 已提交
419 420 421
  TRY(TSDB_MAX_TAG_CONDITIONS) {
    SPlanContext context = {
      .pCatalog = pCatalog,
H
Haojun Liao 已提交
422
      .pDag     = validPointer(calloc(1, sizeof(SQueryDag))),
X
Xiaoyu Wang 已提交
423
      .pCurrentSubplan = NULL,
424 425
       //The unsigned Id starting from 1 would be better
      .nextId   = {.queryId = requestId, .subplanId = 1, .templateId = 1},
X
Xiaoyu Wang 已提交
426
    };
H
Haojun Liao 已提交
427

X
Xiaoyu Wang 已提交
428
    *pDag = context.pDag;
429 430
    context.pDag->queryId = requestId;

431
    context.pDag->pSubplans = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
X
Xiaoyu Wang 已提交
432 433 434 435 436 437
    createSubplanByLevel(&context, pQueryNode);
  } CATCH(code) {
    CLEANUP_EXECUTE();
    terrno = code;
    return TSDB_CODE_FAILED;
  } END_TRY
438

439
  postCreateDag(pQueryNode, *pDag, pNodeList);
440
  return TSDB_CODE_SUCCESS;
441
}
X
Xiaoyu Wang 已提交
442

H
Haojun Liao 已提交
443
void setExchangSourceNode(uint64_t templateId, SDownstreamSource *pSource, SPhyNode* pNode) {
444 445 446 447 448 449
  if (NULL == pNode) {
    return;
  }
  if (OP_Exchange == pNode->info.type) {
    SExchangePhyNode* pExchange = (SExchangePhyNode*)pNode;
    if (templateId == pExchange->srcTemplateId) {
H
Haojun Liao 已提交
450
      taosArrayPush(pExchange->pSrcEndPoints, pSource);
451 452
    }
  }
H
Haojun Liao 已提交
453

454 455 456
  if (pNode->pChildren != NULL) {
    size_t size = taosArrayGetSize(pNode->pChildren);
    for(int32_t i = 0; i < size; ++i) {
H
Haojun Liao 已提交
457
      setExchangSourceNode(templateId, pSource, taosArrayGetP(pNode->pChildren, i));
458 459 460 461
    }
  }
}

H
Haojun Liao 已提交
462 463
void setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource) {
  setExchangSourceNode(templateId, pSource, subplan->pNode);
X
Xiaoyu Wang 已提交
464
}
465 466 467 468 469 470

static void destroyDataSinkNode(SDataSink* pSinkNode) {
  if (pSinkNode == NULL) {
    return;
  }

X
Xiaoyu Wang 已提交
471
  if (queryNodeType(pSinkNode) == DSINK_Dispatch) {
472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490
    SDataDispatcher* pDdSink = (SDataDispatcher*)pSinkNode;
    tfree(pDdSink->sink.schema.pSchema);
  }

  tfree(pSinkNode);
}

void qDestroySubplan(SSubplan* pSubplan) {
  if (pSubplan == NULL) {
    return;
  }

  taosArrayDestroy(pSubplan->pChildren);
  taosArrayDestroy(pSubplan->pParents);
  destroyDataSinkNode(pSubplan->pDataSink);
  cleanupPhyNode(pSubplan->pNode);

  tfree(pSubplan);
}