physicalPlan.c 15.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "plannerInt.h"
X
Xiaoyu Wang 已提交
17
#include "exception.h"
18
#include "parser.h"
19

20 21 22
#define STORE_CURRENT_SUBPLAN(cxt) SSubplan* _ = cxt->pCurrentSubplan
#define RECOVERY_CURRENT_SUBPLAN(cxt) cxt->pCurrentSubplan = _

23 24 25 26 27 28 29
typedef struct SPlanContext {
  struct SCatalog* pCatalog;
  struct SQueryDag* pDag;
  SSubplan* pCurrentSubplan;
  SSubplanId nextId;
} SPlanContext;

30 31 32 33 34 35
static const char* gOpName[] = {
  "Unknown",
#define INCLUDE_AS_NAME
#include "plannerOp.h"
#undef INCLUDE_AS_NAME
};
X
Xiaoyu Wang 已提交
36

37
static void* validPointer(void* p) {
X
Xiaoyu Wang 已提交
38 39 40 41 42 43
  if (NULL == p) {
    THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
  }
  return p;
}

44 45 46 47
const char* opTypeToOpName(int32_t type) {
  return gOpName[type];
}

48 49
int32_t opNameToOpType(const char* name) {
  for (int32_t i = 1; i < sizeof(gOpName) / sizeof(gOpName[0]); ++i) {
50
    if (0 == strcmp(name, gOpName[i])) {
51 52 53 54 55
      return i;
    }
  }
  return OP_Unknown;
}
X
Xiaoyu Wang 已提交
56

57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
const char* dsinkTypeToDsinkName(int32_t type) {
  switch (type) {
    case DSINK_Dispatch:
      return "Dispatch";
    case DSINK_Insert:
      return "Insert";
    default:
      break;
  }
  return "Unknown";
}

int32_t dsinkNameToDsinkType(const char* name) {
  if (0 == strcmp(name, "Dispatch")) {
    return DSINK_Dispatch;
  } else if (0 == strcmp(name, "Insert")) {
    return DSINK_Insert;
  }
  return DSINK_Unknown;
}

78 79 80 81 82 83 84 85 86 87
static bool copySchema(SDataBlockSchema* dst, const SDataBlockSchema* src) {
  dst->pSchema = malloc(sizeof(SSlotSchema) * src->numOfCols);
  if (NULL == dst->pSchema) {
    return false;
  }
  memcpy(dst->pSchema, src->pSchema, sizeof(SSlotSchema) * src->numOfCols);
  dst->numOfCols = src->numOfCols;
  dst->resultRowSize = src->resultRowSize;
  dst->precision = src->precision;
  return true;
X
Xiaoyu Wang 已提交
88 89
}

90
static bool toDataBlockSchema(SQueryPlanNode* pPlanNode, SDataBlockSchema* dataBlockSchema) {
91
  dataBlockSchema->numOfCols = pPlanNode->numOfExpr;
92
  dataBlockSchema->pSchema = malloc(sizeof(SSlotSchema) * pPlanNode->numOfExpr);
93 94 95
  if (NULL == dataBlockSchema->pSchema) {
    return false;
  }
96

97
  dataBlockSchema->resultRowSize = 0;
98 99 100 101
  for (int32_t i = 0; i < pPlanNode->numOfExpr; ++i) {
    SExprInfo* pExprInfo = taosArrayGetP(pPlanNode->pExpr, i);
    memcpy(&dataBlockSchema->pSchema[i], &pExprInfo->base.resSchema, sizeof(SSlotSchema));

102 103
    dataBlockSchema->resultRowSize += dataBlockSchema->pSchema[i].bytes;
  }
104

105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
  return true;
}

static bool cloneExprArray(SArray** dst, SArray* src) {
  if (NULL == src) {
    return true;
  }
  size_t size = taosArrayGetSize(src);
  if (0 == size) {
    return true;
  }
  *dst = taosArrayInit(size, POINTER_BYTES);
  if (NULL == *dst) {
    return false;
  }
  return (TSDB_CODE_SUCCESS == copyAllExprInfo(*dst, src, true) ? true : false);
X
Xiaoyu Wang 已提交
121 122
}

123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
static SDataSink* initDataSink(int32_t type, int32_t size, const SPhyNode* pRoot) {
  SDataSink* sink = (SDataSink*)validPointer(calloc(1, size));
  sink->info.type = type;
  sink->info.name = dsinkTypeToDsinkName(type);
  if (NULL !=pRoot && !copySchema(&sink->schema, &pRoot->targetSchema)) {
    tfree(sink);
    THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
  }
  return sink;
}

static SDataSink* createDataInserter(SPlanContext* pCxt, SVgDataBlocks* pBlocks, const SPhyNode* pRoot) {
  SDataInserter* inserter = (SDataInserter*)initDataSink(DSINK_Insert, sizeof(SDataInserter), pRoot);
  inserter->numOfTables = pBlocks->numOfTables;
  inserter->size = pBlocks->size;
dengyihao's avatar
dengyihao 已提交
138
  TSWAP(inserter->pData, pBlocks->pData, char*);
139 140 141 142 143 144 145 146
  return (SDataSink*)inserter;
}

static SDataSink* createDataDispatcher(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, const SPhyNode* pRoot) {
  SDataDispatcher* dispatcher = (SDataDispatcher*)initDataSink(DSINK_Dispatch, sizeof(SDataDispatcher), pRoot);
  return (SDataSink*)dispatcher;
}

147
static SPhyNode* initPhyNode(SQueryPlanNode* pPlanNode, int32_t type, int32_t size) {
148
  SPhyNode* node = (SPhyNode*)validPointer(calloc(1, size));
X
Xiaoyu Wang 已提交
149
  node->info.type = type;
150 151 152
  node->info.name = opTypeToOpName(type);
  if (!cloneExprArray(&node->pTargets, pPlanNode->pExpr) || !toDataBlockSchema(pPlanNode, &(node->targetSchema))) {
    free(node);
153
    THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
154 155
  }
  return node;
156 157
}

158 159 160 161 162 163 164 165 166 167
static void cleanupPhyNode(SPhyNode* pPhyNode) {
  if (pPhyNode == NULL) {
    return;
  }

  dropOneLevelExprInfo(pPhyNode->pTargets);
  tfree(pPhyNode->targetSchema.pSchema);
  tfree(pPhyNode);
}

168
static SPhyNode* initScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t type, int32_t size) {
H
Haojun Liao 已提交
169 170 171 172 173 174 175 176
  SScanPhyNode* node = (SScanPhyNode*) initPhyNode(pPlanNode, type, size);

  STableMeta *pTableMeta = pTable->pMeta->pTableMeta;
  node->uid       = pTableMeta->uid;
  node->count     = 1;
  node->order     = TSDB_ORDER_ASC;
  node->tableType = pTableMeta->tableType;

177 178 179 180 181 182 183
  return (SPhyNode*)node;
}

static SPhyNode* createPseudoScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t op) {
  return initScanNode(pPlanNode, pTable, op, sizeof(SScanPhyNode));
}

184
static SPhyNode* createTagScanNode(SQueryPlanNode* pPlanNode) {
185 186 187 188 189 190
  SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo;
  return createPseudoScanNode(pPlanNode, pTable, OP_TagScan);
}

static uint8_t getScanFlag(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
  // todo
H
Haojun Liao 已提交
191
  return MAIN_SCAN;
192 193
}

H
Haojun Liao 已提交
194 195 196 197
static SPhyNode* createUserTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pQueryTableInfo, int32_t op) {
  STableScanPhyNode* node = (STableScanPhyNode*)initScanNode(pPlanNode, pQueryTableInfo, op, sizeof(STableScanPhyNode));
  node->scanFlag = getScanFlag(pPlanNode, pQueryTableInfo);
  node->window = pQueryTableInfo->window;
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219
  // todo tag cond
  return (SPhyNode*)node;
}


static bool isSystemTable(SQueryTableInfo* pTable) {
  // todo
  return false;
}

static bool needSeqScan(SQueryPlanNode* pPlanNode) {
  // todo
  return false;
}

static SPhyNode* createMultiTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
  if (isSystemTable(pTable)) {
    return createPseudoScanNode(pPlanNode, pTable, OP_SystemTableScan);
  } else if (needSeqScan(pPlanNode)) {
    return createUserTableScanNode(pPlanNode, pTable, OP_TableSeqScan);
  }
  return createUserTableScanNode(pPlanNode, pTable, OP_DataBlocksOptScan);
220 221 222
}

static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
223
  SSubplan* subplan = validPointer(calloc(1, sizeof(SSubplan)));
224 225
  subplan->id = pCxt->nextId;
  ++(pCxt->nextId.subplanId);
H
Haojun Liao 已提交
226 227

  subplan->type  = type;
228 229 230
  subplan->level = 0;
  if (NULL != pCxt->pCurrentSubplan) {
    subplan->level = pCxt->pCurrentSubplan->level + 1;
H
Haojun Liao 已提交
231 232
    if (NULL == pCxt->pCurrentSubplan->pChildren) {
      pCxt->pCurrentSubplan->pChildren = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
233
    }
H
Haojun Liao 已提交
234 235

    taosArrayPush(pCxt->pCurrentSubplan->pChildren, &subplan);
236
    subplan->pParents = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
237 238
    taosArrayPush(subplan->pParents, &pCxt->pCurrentSubplan);
  }
H
Haojun Liao 已提交
239

240 241
  SArray* currentLevel;
  if (subplan->level >= taosArrayGetSize(pCxt->pDag->pSubplans)) {
242
    currentLevel = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
243 244 245
    taosArrayPush(pCxt->pDag->pSubplans, &currentLevel);
  } else {
    currentLevel = taosArrayGetP(pCxt->pDag->pSubplans, subplan->level);
246
  }
H
Haojun Liao 已提交
247

248
  taosArrayPush(currentLevel, &subplan);
249
  pCxt->pCurrentSubplan = subplan;
X
Xiaoyu Wang 已提交
250
  ++(pCxt->pDag->numOfSubplans);
251 252 253
  return subplan;
}

X
Xiaoyu Wang 已提交
254 255
static void vgroupInfoToEpSet(const SVgroupInfo* vg, SQueryNodeAddr* execNode) {
  execNode->nodeId = vg->vgId;
X
Xiaoyu Wang 已提交
256
  execNode->inUse = vg->inUse;
X
Xiaoyu Wang 已提交
257
  execNode->numOfEps = vg->numOfEps;
X
Xiaoyu Wang 已提交
258
  for (int8_t i = 0; i < vg->numOfEps; ++i) {
X
Xiaoyu Wang 已提交
259
    execNode->epAddr[i] = vg->epAddr[i];
X
Xiaoyu Wang 已提交
260 261 262 263
  }
  return;
}

X
Xiaoyu Wang 已提交
264 265 266 267
static void vgroupMsgToEpSet(const SVgroupMsg* vg, SQueryNodeAddr* execNode) {
  execNode->nodeId = vg->vgId;
  execNode->inUse = 0; // todo
  execNode->numOfEps = vg->numOfEps;
268
  for (int8_t i = 0; i < vg->numOfEps; ++i) {
X
Xiaoyu Wang 已提交
269
    execNode->epAddr[i] = vg->epAddr[i];
270 271
  }
  return;
X
Xiaoyu Wang 已提交
272 273
}

274 275
static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, SQueryTableInfo* pTableInfo) {
  SVgroupsInfo* pVgroupList = pTableInfo->pMeta->vgroupList;
276
  for (int32_t i = 0; i < pVgroupList->numOfVgroups; ++i) {
277
    STORE_CURRENT_SUBPLAN(pCxt);
278
    SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN);
D
dapan 已提交
279
    subplan->msgType   = TDMT_VND_QUERY;
280 281
    vgroupMsgToEpSet(&(pTableInfo->pMeta->vgroupList->vgroups[i]), &subplan->execNode);
    subplan->pNode = createMultiTableScanNode(pPlanNode, pTableInfo);
282
    subplan->pDataSink = createDataDispatcher(pCxt, pPlanNode, subplan->pNode);
283
    RECOVERY_CURRENT_SUBPLAN(pCxt);
284
  }
285
  return pCxt->nextId.templateId++;
X
Xiaoyu Wang 已提交
286 287
}

288 289 290
static SPhyNode* createExchangeNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, uint64_t srcTemplateId) {
  SExchangePhyNode* node = (SExchangePhyNode*)initPhyNode(pPlanNode, OP_Exchange, sizeof(SExchangePhyNode));
  node->srcTemplateId = srcTemplateId;
H
Haojun Liao 已提交
291
  node->pSrcEndPoints = validPointer(taosArrayInit(TARRAY_MIN_SIZE, sizeof(SDownstreamSource)));
292 293
  return (SPhyNode*)node;
}
294

295 296 297
static bool needMultiNodeScan(SQueryTableInfo* pTable) {
  // todo system table, for instance, user_tables
  return (TSDB_SUPER_TABLE == pTable->pMeta->pTableMeta->tableType);
298 299
}

D
dapan 已提交
300 301 302
static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, SSubplan* subplan) {
  vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[0]), &subplan->execNode);

H
Haojun Liao 已提交
303 304
  int32_t type = (pPlanNode->info.type == QNODE_TABLESCAN)? OP_TableScan:OP_StreamScan;
  return createUserTableScanNode(pPlanNode, pTable, type);
D
dapan 已提交
305 306
}

307
static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
308
  SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo;
309 310
  if (needMultiNodeScan(pTable)) {
    return createExchangeNode(pCxt, pPlanNode, splitSubplanByTable(pCxt, pPlanNode, pTable));
311
  }
D
dapan 已提交
312
  return createSingleTableScanNode(pPlanNode, pTable, pCxt->pCurrentSubplan);
313 314
}

315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333
static SPhyNode* createSingleTableAgg(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
  SAggPhyNode* node = (SAggPhyNode*)initPhyNode(pPlanNode, OP_Aggregate, sizeof(SAggPhyNode));
  SGroupbyExpr* pGroupBy = (SGroupbyExpr*)pPlanNode->pExtInfo;
  node->aggAlgo = AGG_ALGO_PLAIN;
  node->aggSplit = AGG_SPLIT_FINAL;
  if (NULL != pGroupBy) {
    node->aggAlgo = AGG_ALGO_HASHED;
    node->pGroupByList = validPointer(taosArrayDup(pGroupBy->columnInfo));
  }
  return (SPhyNode*)node;
}

static SPhyNode* createAggNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
  // if (needMultiNodeAgg(pPlanNode)) {

  // }
  return createSingleTableAgg(pCxt, pPlanNode);
}

334
static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
X
Xiaoyu Wang 已提交
335 336 337 338 339
  SPhyNode* node = NULL;
  switch (pPlanNode->info.type) {
    case QNODE_TAGSCAN:
      node = createTagScanNode(pPlanNode);
      break;
H
Haojun Liao 已提交
340
    case QNODE_STREAMSCAN:
X
Xiaoyu Wang 已提交
341
    case QNODE_TABLESCAN:
342
      node = createTableScanNode(pCxt, pPlanNode);
X
Xiaoyu Wang 已提交
343
      break;
344 345 346 347
    case QNODE_AGGREGATE:
    case QNODE_GROUPBY:
      node = createAggNode(pCxt, pPlanNode);
      break;
348
    case QNODE_MODIFY:
X
Xiaoyu Wang 已提交
349 350
      // Insert is not an operator in a physical plan.
      break;
X
Xiaoyu Wang 已提交
351 352 353
    default:
      assert(false);
  }
H
Haojun Liao 已提交
354

X
Xiaoyu Wang 已提交
355
  if (pPlanNode->pChildren != NULL && taosArrayGetSize(pPlanNode->pChildren) > 0) {
X
Xiaoyu Wang 已提交
356
    node->pChildren = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
X
Xiaoyu Wang 已提交
357 358
    size_t size = taosArrayGetSize(pPlanNode->pChildren);
    for(int32_t i = 0; i < size; ++i) {
359
      SPhyNode* child = createPhyNode(pCxt, taosArrayGetP(pPlanNode->pChildren, i));
X
Xiaoyu Wang 已提交
360 361 362
      child->pParent = node;
      taosArrayPush(node->pChildren, &child);
    }
363
  }
H
Haojun Liao 已提交
364

X
Xiaoyu Wang 已提交
365
  return node;
366 367
}

H
Haojun Liao 已提交
368 369 370 371 372
static void splitModificationOpSubPlan(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
  SDataPayloadInfo* pPayload = (SDataPayloadInfo*) pPlanNode->pExtInfo;

  size_t numOfVgroups = taosArrayGetSize(pPayload->payload);
  for (int32_t i = 0; i < numOfVgroups; ++i) {
X
Xiaoyu Wang 已提交
373
    STORE_CURRENT_SUBPLAN(pCxt);
X
Xiaoyu Wang 已提交
374
    SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MODIFY);
H
Haojun Liao 已提交
375 376
    SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(pPayload->payload, i);

X
Xiaoyu Wang 已提交
377
    vgroupInfoToEpSet(&blocks->vg, &subplan->execNode);
378
    subplan->pDataSink  = createDataInserter(pCxt, blocks, NULL);
379 380 381
    subplan->pNode      = NULL;
    subplan->type       = QUERY_TYPE_MODIFY;
    subplan->msgType    = pPayload->msgType;
382
    subplan->id.queryId = pCxt->pDag->queryId;
H
Haojun Liao 已提交
383

X
Xiaoyu Wang 已提交
384
    RECOVERY_CURRENT_SUBPLAN(pCxt);
X
Xiaoyu Wang 已提交
385 386 387
  }
}

388
static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) {
389
  if (QNODE_MODIFY == pRoot->info.type) {
H
Haojun Liao 已提交
390
    splitModificationOpSubPlan(pCxt, pRoot);
X
Xiaoyu Wang 已提交
391
  } else {
D
dapan 已提交
392
    SSubplan* subplan  = initSubplan(pCxt, QUERY_TYPE_SCAN);
X
Xiaoyu Wang 已提交
393
    ++(pCxt->nextId.templateId);
H
Haojun Liao 已提交
394 395 396

    subplan->msgType   = TDMT_VND_QUERY;
    subplan->pNode     = createPhyNode(pCxt, pRoot);
397
    subplan->pDataSink = createDataDispatcher(pCxt, pRoot, subplan->pNode);
X
Xiaoyu Wang 已提交
398
  }
399 400 401
  // todo deal subquery
}

402
int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag, SArray* pNodeList, uint64_t requestId) {
X
Xiaoyu Wang 已提交
403 404 405
  TRY(TSDB_MAX_TAG_CONDITIONS) {
    SPlanContext context = {
      .pCatalog = pCatalog,
H
Haojun Liao 已提交
406
      .pDag     = validPointer(calloc(1, sizeof(SQueryDag))),
X
Xiaoyu Wang 已提交
407
      .pCurrentSubplan = NULL,
408 409
       //The unsigned Id starting from 1 would be better
      .nextId   = {.queryId = requestId, .subplanId = 1, .templateId = 1},
X
Xiaoyu Wang 已提交
410
    };
H
Haojun Liao 已提交
411

X
Xiaoyu Wang 已提交
412
    *pDag = context.pDag;
413 414
    context.pDag->queryId = requestId;

415
    context.pDag->pSubplans = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
X
Xiaoyu Wang 已提交
416 417 418 419 420 421
    createSubplanByLevel(&context, pQueryNode);
  } CATCH(code) {
    CLEANUP_EXECUTE();
    terrno = code;
    return TSDB_CODE_FAILED;
  } END_TRY
422 423 424 425 426 427 428 429 430 431 432

  // traverse the dag again to acquire the execution node.
  if (pNodeList != NULL) {
    SArray** pSubLevel = taosArrayGetLast((*pDag)->pSubplans);
    size_t  num = taosArrayGetSize(*pSubLevel);
    for (int32_t j = 0; j < num; ++j) {
      SSubplan* pPlan = taosArrayGetP(*pSubLevel, j);
      taosArrayPush(pNodeList, &pPlan->execNode);
    }
  }

433
  return TSDB_CODE_SUCCESS;
434
}
X
Xiaoyu Wang 已提交
435

H
Haojun Liao 已提交
436
void setExchangSourceNode(uint64_t templateId, SDownstreamSource *pSource, SPhyNode* pNode) {
437 438 439 440 441 442
  if (NULL == pNode) {
    return;
  }
  if (OP_Exchange == pNode->info.type) {
    SExchangePhyNode* pExchange = (SExchangePhyNode*)pNode;
    if (templateId == pExchange->srcTemplateId) {
H
Haojun Liao 已提交
443
      taosArrayPush(pExchange->pSrcEndPoints, pSource);
444 445
    }
  }
H
Haojun Liao 已提交
446

447 448 449
  if (pNode->pChildren != NULL) {
    size_t size = taosArrayGetSize(pNode->pChildren);
    for(int32_t i = 0; i < size; ++i) {
H
Haojun Liao 已提交
450
      setExchangSourceNode(templateId, pSource, taosArrayGetP(pNode->pChildren, i));
451 452 453 454
    }
  }
}

H
Haojun Liao 已提交
455 456
void setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource) {
  setExchangSourceNode(templateId, pSource, subplan->pNode);
X
Xiaoyu Wang 已提交
457
}
458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483

static void destroyDataSinkNode(SDataSink* pSinkNode) {
  if (pSinkNode == NULL) {
    return;
  }

  if (nodeType(pSinkNode) == DSINK_Dispatch) {
    SDataDispatcher* pDdSink = (SDataDispatcher*)pSinkNode;
    tfree(pDdSink->sink.schema.pSchema);
  }

  tfree(pSinkNode);
}

void qDestroySubplan(SSubplan* pSubplan) {
  if (pSubplan == NULL) {
    return;
  }

  taosArrayDestroy(pSubplan->pChildren);
  taosArrayDestroy(pSubplan->pParents);
  destroyDataSinkNode(pSubplan->pDataSink);
  cleanupPhyNode(pSubplan->pNode);

  tfree(pSubplan);
}